diff --git a/CHANGELOG.md b/CHANGELOG.md index 210aa550daa..9373c49499c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ Here is an overview of all new **experimental** features: ### Fixes - **General**: Fix CVE-2024-28180 in github.com/go-jose/go-jose/v3 ([#5617](https://github.com/kedacore/keda/pull/5617)) +- **General**: Fix wrong scaler active value and paused value that are pushed to OpenTelemetry ([#5705](https://github.com/kedacore/keda/issues/5705)) - **General**: Log field `ScaledJob` no longer have conflicting types ([#5592](https://github.com/kedacore/keda/pull/5592)) - **General**: Prometheus metrics shows errors correctly ([#5597](https://github.com/kedacore/keda/issues/5597)|[#5663](https://github.com/kedacore/keda/issues/5663)) - **General**: Validate empty array value of triggers in ScaledObject/ScaledJob creation ([#5520](https://github.com/kedacore/keda/issues/5520)) diff --git a/pkg/metricscollector/opentelemetry.go b/pkg/metricscollector/opentelemetry.go index ee1d3190185..2955bca5b88 100644 --- a/pkg/metricscollector/opentelemetry.go +++ b/pkg/metricscollector/opentelemetry.go @@ -33,17 +33,18 @@ var ( otTriggerRegisteredTotalsCounter api.Int64UpDownCounter otCrdRegisteredTotalsCounter api.Int64UpDownCounter - otelScalerMetricVal OtelMetricFloat64Val - otelScalerMetricsLatencyVal OtelMetricFloat64Val - otelScalerMetricsLatencyValDeprecated OtelMetricFloat64Val - otelInternalLoopLatencyVal OtelMetricFloat64Val - otelInternalLoopLatencyValDeprecated OtelMetricFloat64Val + otelScalerMetricVals []OtelMetricFloat64Val + otelScalerMetricsLatencyVals []OtelMetricFloat64Val + otelScalerMetricsLatencyValDeprecated []OtelMetricFloat64Val + otelInternalLoopLatencyVals []OtelMetricFloat64Val + otelInternalLoopLatencyValDeprecated []OtelMetricFloat64Val otelBuildInfoVal OtelMetricInt64Val - otCloudEventEmittedCounter api.Int64Counter - otCloudEventQueueStatusVal OtelMetricFloat64Val + otCloudEventEmittedCounter api.Int64Counter + otCloudEventQueueStatusVals []OtelMetricFloat64Val - otelScalerActiveVal OtelMetricFloat64Val + otelScalerActiveVals []OtelMetricFloat64Val + otelScalerPauseVals []OtelMetricFloat64Val ) type OtelMetrics struct { @@ -196,6 +197,15 @@ func initMeters() { if err != nil { otLog.Error(err, msg) } + + _, err = meter.Float64ObservableGauge( + "keda.scaled.object.paused", + api.WithDescription("Indicates whether a ScaledObject is paused"), + api.WithFloat64Callback(PausedStatusCallback), + ) + if err != nil { + otLog.Error(err, msg) + } } func BuildInfoCallback(_ context.Context, obsrv api.Int64Observer) error { @@ -220,55 +230,62 @@ func (o *OtelMetrics) RecordBuildInfo() { } func ScalerMetricValueCallback(_ context.Context, obsrv api.Float64Observer) error { - if otelScalerMetricVal.measurementOption != nil { - obsrv.Observe(otelScalerMetricVal.val, otelScalerMetricVal.measurementOption) + for _, v := range otelScalerMetricVals { + obsrv.Observe(v.val, v.measurementOption) } - otelScalerMetricVal = OtelMetricFloat64Val{} + otelScalerMetricVals = []OtelMetricFloat64Val{} return nil } func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) { - otelScalerMetricVal.val = value - otelScalerMetricVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) + otelScalerMetric := OtelMetricFloat64Val{} + otelScalerMetric.val = value + otelScalerMetric.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) + otelScalerMetricVals = append(otelScalerMetricVals, otelScalerMetric) } func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer) error { - if otelScalerMetricsLatencyVal.measurementOption != nil { - obsrv.Observe(otelScalerMetricsLatencyVal.val, otelScalerMetricsLatencyVal.measurementOption) + for _, v := range otelScalerMetricsLatencyVals { + obsrv.Observe(v.val, v.measurementOption) } - otelScalerMetricsLatencyVal = OtelMetricFloat64Val{} + otelScalerMetricsLatencyVals = []OtelMetricFloat64Val{} return nil } func ScalerMetricsLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64Observer) error { - if otelScalerMetricsLatencyValDeprecated.measurementOption != nil { - obsrv.Observe(otelScalerMetricsLatencyValDeprecated.val, otelScalerMetricsLatencyValDeprecated.measurementOption) + for _, v := range otelScalerMetricsLatencyValDeprecated { + obsrv.Observe(v.val, v.measurementOption) } - otelScalerMetricsLatencyValDeprecated = OtelMetricFloat64Val{} + otelScalerMetricsLatencyValDeprecated = []OtelMetricFloat64Val{} return nil } // RecordScalerLatency create a measurement of the latency to external metric func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) { - otelScalerMetricsLatencyVal.val = value.Seconds() - otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) - otelScalerMetricsLatencyValDeprecated.val = float64(value.Milliseconds()) - otelScalerMetricsLatencyValDeprecated.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) + otelScalerMetricsLatency := OtelMetricFloat64Val{} + otelScalerMetricsLatency.val = value.Seconds() + otelScalerMetricsLatency.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) + otelScalerMetricsLatencyVals = append(otelScalerMetricsLatencyVals, otelScalerMetricsLatency) + + otelScalerMetricsLatencyValD := OtelMetricFloat64Val{} + otelScalerMetricsLatencyValD.val = float64(value.Milliseconds()) + otelScalerMetricsLatencyValD.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) + otelScalerMetricsLatencyValDeprecated = append(otelScalerMetricsLatencyValDeprecated, otelScalerMetricsLatencyValD) } func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error { - if otelInternalLoopLatencyVal.measurementOption != nil { - obsrv.Observe(otelInternalLoopLatencyVal.val, otelInternalLoopLatencyVal.measurementOption) + for _, v := range otelInternalLoopLatencyVals { + obsrv.Observe(v.val, v.measurementOption) } - otelInternalLoopLatencyVal = OtelMetricFloat64Val{} + otelInternalLoopLatencyVals = []OtelMetricFloat64Val{} return nil } func ScalableObjectLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64Observer) error { - if otelInternalLoopLatencyValDeprecated.measurementOption != nil { - obsrv.Observe(otelInternalLoopLatencyValDeprecated.val, otelInternalLoopLatencyValDeprecated.measurementOption) + for _, v := range otelInternalLoopLatencyValDeprecated { + obsrv.Observe(v.val, v.measurementOption) } - otelInternalLoopLatencyValDeprecated = OtelMetricFloat64Val{} + otelInternalLoopLatencyValDeprecated = []OtelMetricFloat64Val{} return nil } @@ -284,29 +301,43 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string, attribute.Key("type").String(resourceType), attribute.Key("name").String(name)) - otelInternalLoopLatencyVal.val = value.Seconds() - otelInternalLoopLatencyVal.measurementOption = opt - otelInternalLoopLatencyValDeprecated.val = float64(value.Milliseconds()) - otelInternalLoopLatencyValDeprecated.measurementOption = opt + otelInternalLoopLatency := OtelMetricFloat64Val{} + otelInternalLoopLatency.val = value.Seconds() + otelInternalLoopLatency.measurementOption = opt + otelInternalLoopLatencyVals = append(otelInternalLoopLatencyVals, otelInternalLoopLatency) + + otelInternalLoopLatencyD := OtelMetricFloat64Val{} + otelInternalLoopLatencyD.val = float64(value.Milliseconds()) + otelInternalLoopLatencyD.measurementOption = opt + otelInternalLoopLatencyValDeprecated = append(otelInternalLoopLatencyValDeprecated, otelInternalLoopLatencyD) } func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error { - if otelScalerActiveVal.measurementOption != nil { - obsrv.Observe(otelScalerActiveVal.val, otelScalerActiveVal.measurementOption) + for _, v := range otelScalerActiveVals { + obsrv.Observe(v.val, v.measurementOption) } - otelScalerActiveVal = OtelMetricFloat64Val{} + otelScalerActiveVals = []OtelMetricFloat64Val{} return nil } // RecordScalerActive create a measurement of the activity of the scaler func (o *OtelMetrics) RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) { - activeVal := -1 + activeVal := 0 if active { activeVal = 1 } + otelScalerActive := OtelMetricFloat64Val{} + otelScalerActive.val = float64(activeVal) + otelScalerActive.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) + otelScalerActiveVals = append(otelScalerActiveVals, otelScalerActive) +} - otelScalerActiveVal.val = float64(activeVal) - otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) +func PausedStatusCallback(_ context.Context, obsrv api.Float64Observer) error { + for _, v := range otelScalerPauseVals { + obsrv.Observe(v.val, v.measurementOption) + } + otelScalerPauseVals = []OtelMetricFloat64Val{} + return nil } // RecordScaledObjectPaused marks whether the current ScaledObject is paused. @@ -318,21 +349,12 @@ func (o *OtelMetrics) RecordScaledObjectPaused(namespace string, scaledObject st opt := api.WithAttributes( attribute.Key("namespace").String(namespace), - attribute.Key("scaledObject").String(scaledObject), - ) + attribute.Key("scaledObject").String(scaledObject)) - cback := func(_ context.Context, obsrv api.Float64Observer) error { - obsrv.Observe(float64(activeVal), opt) - return nil - } - _, err := meter.Float64ObservableGauge( - "keda.scaled.object.paused", - api.WithDescription("Indicates whether a ScaledObject is paused"), - api.WithFloat64Callback(cback), - ) - if err != nil { - otLog.Error(err, "failed to register scaled object paused metric", "namespace", namespace, "scaledObject", scaledObject) - } + otelScalerPause := OtelMetricFloat64Val{} + otelScalerPause.val = float64(activeVal) + otelScalerPause.measurementOption = opt + otelScalerPauseVals = append(otelScalerPauseVals, otelScalerPause) } // RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA @@ -448,10 +470,10 @@ func (o *OtelMetrics) RecordCloudEventEmittedError(namespace string, cloudevents } func CloudeventQueueStatusCallback(_ context.Context, obsrv api.Float64Observer) error { - if otCloudEventQueueStatusVal.measurementOption != nil { - obsrv.Observe(otCloudEventQueueStatusVal.val, otCloudEventQueueStatusVal.measurementOption) + for _, v := range otCloudEventQueueStatusVals { + obsrv.Observe(v.val, v.measurementOption) } - otCloudEventQueueStatusVal = OtelMetricFloat64Val{} + otCloudEventQueueStatusVals = []OtelMetricFloat64Val{} return nil } @@ -461,6 +483,8 @@ func (o *OtelMetrics) RecordCloudEventQueueStatus(namespace string, value int) { attribute.Key("namespace").String(namespace), ) - otCloudEventQueueStatusVal.val = float64(value) - otCloudEventQueueStatusVal.measurementOption = opt + otCloudEventQueueStatus := OtelMetricFloat64Val{} + otCloudEventQueueStatus.val = float64(value) + otCloudEventQueueStatus.measurementOption = opt + otCloudEventQueueStatusVals = append(otCloudEventQueueStatusVals, otCloudEventQueueStatus) } diff --git a/pkg/metricscollector/opentelemetry_test.go b/pkg/metricscollector/opentelemetry_test.go index 82f3071be49..3cd2616f001 100644 --- a/pkg/metricscollector/opentelemetry_test.go +++ b/pkg/metricscollector/opentelemetry_test.go @@ -102,3 +102,56 @@ func TestLoopLatency(t *testing.T) { data = latencySeconds.Data.(metricdata.Gauge[float64]).DataPoints[0] assert.Equal(t, data.Value, float64(0.5)) } + +func TestContinuousMetrics(t *testing.T) { + testOtel.RecordScalerActive("testnamespace", "testresource", "testscaler", 0, "testmetric", true, true) + testOtel.RecordScalerActive("testnamespace2", "testresource2", "testscaler2", 0, "testmetric", false, false) + got := metricdata.ResourceMetrics{} + err := testReader.Collect(context.Background(), &got) + + assert.Nil(t, err) + scopeMetrics := got.ScopeMetrics[0] + assert.NotEqual(t, len(scopeMetrics.Metrics), 0) + activeMetric := retrieveMetric(scopeMetrics.Metrics, "keda.scaler.active") + + assert.NotNil(t, buildInfo) + + dataPoints := activeMetric.Data.(metricdata.Gauge[float64]).DataPoints + assert.Len(t, dataPoints, 2) + + var scaledObjectMetric metricdata.DataPoint[float64] + for _, v := range dataPoints { + attribute, _ := v.Attributes.Value("namespace") + if attribute.AsString() == "testnamespace" { + scaledObjectMetric = v + } + } + + assert.NotEqual(t, scaledObjectMetric, metricdata.DataPoint[float64]{}) + attribute, _ := scaledObjectMetric.Attributes.Value("scaledObject") + assert.Equal(t, attribute.AsString(), "testresource") + attribute, _ = scaledObjectMetric.Attributes.Value("scaler") + assert.Equal(t, attribute.AsString(), "testscaler") + attribute, _ = scaledObjectMetric.Attributes.Value("metric") + assert.Equal(t, attribute.AsString(), "testmetric") + assert.Equal(t, scaledObjectMetric.Value, 1.0) + + var scaledJobMetric metricdata.DataPoint[float64] + for _, v := range dataPoints { + attribute, _ := v.Attributes.Value("namespace") + if attribute.AsString() == "testnamespace2" { + scaledJobMetric = v + } + } + + assert.NotEqual(t, scaledJobMetric, metricdata.DataPoint[float64]{}) + attribute, _ = scaledJobMetric.Attributes.Value("namespace") + assert.Equal(t, attribute.AsString(), "testnamespace2") + attribute, _ = scaledJobMetric.Attributes.Value("scaledJob") + assert.Equal(t, attribute.AsString(), "testresource2") + attribute, _ = scaledJobMetric.Attributes.Value("scaler") + assert.Equal(t, attribute.AsString(), "testscaler2") + attribute, _ = scaledJobMetric.Attributes.Value("metric") + assert.Equal(t, attribute.AsString(), "testmetric") + assert.Equal(t, scaledJobMetric.Value, 0.0) +} diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index 2b7309776db..c8d185a82d4 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -450,7 +450,7 @@ func TestOpenTelemetryMetrics(t *testing.T) { testScalerMetricValue(t) testScalerMetricLatency(t) - testScalerActiveMetric(t) + testScalerActiveMetric(t, kc) testScaledObjectErrors(t, data) testScaledJobErrors(t, data) testScalerErrors(t, data) @@ -748,28 +748,20 @@ func testScalableObjectMetrics(t *testing.T) { } } -func testScalerActiveMetric(t *testing.T) { +func testScalerActiveMetric(t *testing.T, kc *kubernetes.Clientset) { t.Log("--- testing scaler active metric ---") - family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) + families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) + assertScaledObjectFlagMetric(t, families, scaledObjectName, "keda_scaler_active", true) - val, ok := family["keda_scaler_active"] - assert.True(t, ok, "keda_scaler_active not available") - if ok { - var found bool - metrics := val.GetMetric() - for _, metric := range metrics { - labels := metric.GetLabel() - for _, label := range labels { - if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || - (*label.Name == labelScaledJob && *label.Value == scaledJobName) { - assert.Equal(t, float64(1), *metric.Gauge.Value) - found = true - } - } - } - assert.Equal(t, true, found) - } + t.Log("--- testing scaler active metric scaled down ---") + KubernetesScaleDeployment(t, kc, monitoredDeploymentName, 0, testNamespace) + WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 2) + time.Sleep(10 * time.Second) + families = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) + + assertScaledObjectFlagMetric(t, families, scaledObjectName, "keda_scaler_active", false) + KubernetesScaleDeployment(t, kc, monitoredDeploymentName, 4, testNamespace) } func testScaledObjectPausedMetric(t *testing.T, data templateData) { @@ -781,7 +773,7 @@ func testScaledObjectPausedMetric(t *testing.T, data templateData) { time.Sleep(20 * time.Second) // Check that the paused metric is now true families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) - assertScaledObjectPausedMetric(t, families, scaledObjectName, true) + assertScaledObjectFlagMetric(t, families, scaledObjectName, "keda_scaled_object_paused", true) // Unpause the ScaledObject KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) @@ -789,7 +781,7 @@ func testScaledObjectPausedMetric(t *testing.T, data templateData) { time.Sleep(20 * time.Second) // Check that the paused metric is back to false families = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) - assertScaledObjectPausedMetric(t, families, scaledObjectName, false) + assertScaledObjectFlagMetric(t, families, scaledObjectName, "keda_scaled_object_paused", false) } func testOperatorMetrics(t *testing.T, kc *kubernetes.Clientset, data templateData) { @@ -970,9 +962,9 @@ func checkCRTotalValues(t *testing.T, families map[string]*prommodel.MetricFamil } } -func assertScaledObjectPausedMetric(t *testing.T, families map[string]*prommodel.MetricFamily, scaledObjectName string, expected bool) { - family, ok := families["keda_scaled_object_paused"] - assert.True(t, ok, "keda_scaled_object_paused not available") +func assertScaledObjectFlagMetric(t *testing.T, families map[string]*prommodel.MetricFamily, scaledObjectName string, metricName string, expected bool) { + family, ok := families[metricName] + assert.True(t, ok, "%s not available", metricName) if !ok { return } @@ -980,6 +972,7 @@ func assertScaledObjectPausedMetric(t *testing.T, families map[string]*prommodel metricValue := 0.0 metrics := family.GetMetric() for _, metric := range metrics { + t.Log("scaledobject flag metric detail info ---", "metric", metric, "scaledObjectName", scaledObjectName, "metricName", metricName) labels := metric.GetLabel() for _, label := range labels { if *label.Name == labelScaledObject && *label.Value == scaledObjectName {