Skip to content

Commit

Permalink
Fix: fix the wrong value of scaler active and the failure of paused v…
Browse files Browse the repository at this point in the history
…alue in Opentelemetry (#5704)

Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>
  • Loading branch information
SpiritZhou committed Apr 23, 2024
1 parent 9122323 commit 9a00d1e
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Here is an overview of all new **experimental** features:
### Fixes

- **General**: Fix CVE-2024-28180 in github.com/go-jose/go-jose/v3 ([#5617](https://github.com/kedacore/keda/pull/5617))
- **General**: Fix wrong scaler active value and paused value that are pushed to OpenTelemetry ([#5705](https://github.com/kedacore/keda/issues/5705))
- **General**: Log field `ScaledJob` no longer have conflicting types ([#5592](https://github.com/kedacore/keda/pull/5592))
- **General**: Prometheus metrics shows errors correctly ([#5597](https://github.com/kedacore/keda/issues/5597)|[#5663](https://github.com/kedacore/keda/issues/5663))
- **General**: Validate empty array value of triggers in ScaledObject/ScaledJob creation ([#5520](https://github.com/kedacore/keda/issues/5520))
Expand Down
140 changes: 82 additions & 58 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ var (
otTriggerRegisteredTotalsCounter api.Int64UpDownCounter
otCrdRegisteredTotalsCounter api.Int64UpDownCounter

otelScalerMetricVal OtelMetricFloat64Val
otelScalerMetricsLatencyVal OtelMetricFloat64Val
otelScalerMetricsLatencyValDeprecated OtelMetricFloat64Val
otelInternalLoopLatencyVal OtelMetricFloat64Val
otelInternalLoopLatencyValDeprecated OtelMetricFloat64Val
otelScalerMetricVals []OtelMetricFloat64Val
otelScalerMetricsLatencyVals []OtelMetricFloat64Val
otelScalerMetricsLatencyValDeprecated []OtelMetricFloat64Val
otelInternalLoopLatencyVals []OtelMetricFloat64Val
otelInternalLoopLatencyValDeprecated []OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val

otCloudEventEmittedCounter api.Int64Counter
otCloudEventQueueStatusVal OtelMetricFloat64Val
otCloudEventEmittedCounter api.Int64Counter
otCloudEventQueueStatusVals []OtelMetricFloat64Val

otelScalerActiveVal OtelMetricFloat64Val
otelScalerActiveVals []OtelMetricFloat64Val
otelScalerPauseVals []OtelMetricFloat64Val
)

type OtelMetrics struct {
Expand Down Expand Up @@ -196,6 +197,15 @@ func initMeters() {
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.scaled.object.paused",
api.WithDescription("Indicates whether a ScaledObject is paused"),
api.WithFloat64Callback(PausedStatusCallback),
)
if err != nil {
otLog.Error(err, msg)
}
}

func BuildInfoCallback(_ context.Context, obsrv api.Int64Observer) error {
Expand All @@ -220,55 +230,62 @@ func (o *OtelMetrics) RecordBuildInfo() {
}

func ScalerMetricValueCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricVal.measurementOption != nil {
obsrv.Observe(otelScalerMetricVal.val, otelScalerMetricVal.measurementOption)
for _, v := range otelScalerMetricVals {
obsrv.Observe(v.val, v.measurementOption)
}
otelScalerMetricVal = OtelMetricFloat64Val{}
otelScalerMetricVals = []OtelMetricFloat64Val{}
return nil
}

func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
otelScalerMetricVal.val = value
otelScalerMetricVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetric := OtelMetricFloat64Val{}
otelScalerMetric.val = value
otelScalerMetric.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricVals = append(otelScalerMetricVals, otelScalerMetric)
}

func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricsLatencyVal.measurementOption != nil {
obsrv.Observe(otelScalerMetricsLatencyVal.val, otelScalerMetricsLatencyVal.measurementOption)
for _, v := range otelScalerMetricsLatencyVals {
obsrv.Observe(v.val, v.measurementOption)
}
otelScalerMetricsLatencyVal = OtelMetricFloat64Val{}
otelScalerMetricsLatencyVals = []OtelMetricFloat64Val{}
return nil
}

func ScalerMetricsLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricsLatencyValDeprecated.measurementOption != nil {
obsrv.Observe(otelScalerMetricsLatencyValDeprecated.val, otelScalerMetricsLatencyValDeprecated.measurementOption)
for _, v := range otelScalerMetricsLatencyValDeprecated {
obsrv.Observe(v.val, v.measurementOption)
}
otelScalerMetricsLatencyValDeprecated = OtelMetricFloat64Val{}
otelScalerMetricsLatencyValDeprecated = []OtelMetricFloat64Val{}
return nil
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) {
otelScalerMetricsLatencyVal.val = value.Seconds()
otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricsLatencyValDeprecated.val = float64(value.Milliseconds())
otelScalerMetricsLatencyValDeprecated.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricsLatency := OtelMetricFloat64Val{}
otelScalerMetricsLatency.val = value.Seconds()
otelScalerMetricsLatency.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricsLatencyVals = append(otelScalerMetricsLatencyVals, otelScalerMetricsLatency)

otelScalerMetricsLatencyValD := OtelMetricFloat64Val{}
otelScalerMetricsLatencyValD.val = float64(value.Milliseconds())
otelScalerMetricsLatencyValD.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricsLatencyValDeprecated = append(otelScalerMetricsLatencyValDeprecated, otelScalerMetricsLatencyValD)
}

func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelInternalLoopLatencyVal.measurementOption != nil {
obsrv.Observe(otelInternalLoopLatencyVal.val, otelInternalLoopLatencyVal.measurementOption)
for _, v := range otelInternalLoopLatencyVals {
obsrv.Observe(v.val, v.measurementOption)
}
otelInternalLoopLatencyVal = OtelMetricFloat64Val{}
otelInternalLoopLatencyVals = []OtelMetricFloat64Val{}
return nil
}

func ScalableObjectLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64Observer) error {
if otelInternalLoopLatencyValDeprecated.measurementOption != nil {
obsrv.Observe(otelInternalLoopLatencyValDeprecated.val, otelInternalLoopLatencyValDeprecated.measurementOption)
for _, v := range otelInternalLoopLatencyValDeprecated {
obsrv.Observe(v.val, v.measurementOption)
}
otelInternalLoopLatencyValDeprecated = OtelMetricFloat64Val{}
otelInternalLoopLatencyValDeprecated = []OtelMetricFloat64Val{}
return nil
}

Expand All @@ -284,29 +301,43 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string,
attribute.Key("type").String(resourceType),
attribute.Key("name").String(name))

otelInternalLoopLatencyVal.val = value.Seconds()
otelInternalLoopLatencyVal.measurementOption = opt
otelInternalLoopLatencyValDeprecated.val = float64(value.Milliseconds())
otelInternalLoopLatencyValDeprecated.measurementOption = opt
otelInternalLoopLatency := OtelMetricFloat64Val{}
otelInternalLoopLatency.val = value.Seconds()
otelInternalLoopLatency.measurementOption = opt
otelInternalLoopLatencyVals = append(otelInternalLoopLatencyVals, otelInternalLoopLatency)

otelInternalLoopLatencyD := OtelMetricFloat64Val{}
otelInternalLoopLatencyD.val = float64(value.Milliseconds())
otelInternalLoopLatencyD.measurementOption = opt
otelInternalLoopLatencyValDeprecated = append(otelInternalLoopLatencyValDeprecated, otelInternalLoopLatencyD)
}

func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerActiveVal.measurementOption != nil {
obsrv.Observe(otelScalerActiveVal.val, otelScalerActiveVal.measurementOption)
for _, v := range otelScalerActiveVals {
obsrv.Observe(v.val, v.measurementOption)
}
otelScalerActiveVal = OtelMetricFloat64Val{}
otelScalerActiveVals = []OtelMetricFloat64Val{}
return nil
}

// RecordScalerActive create a measurement of the activity of the scaler
func (o *OtelMetrics) RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) {
activeVal := -1
activeVal := 0
if active {
activeVal = 1
}
otelScalerActive := OtelMetricFloat64Val{}
otelScalerActive.val = float64(activeVal)
otelScalerActive.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerActiveVals = append(otelScalerActiveVals, otelScalerActive)
}

otelScalerActiveVal.val = float64(activeVal)
otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
func PausedStatusCallback(_ context.Context, obsrv api.Float64Observer) error {
for _, v := range otelScalerPauseVals {
obsrv.Observe(v.val, v.measurementOption)
}
otelScalerPauseVals = []OtelMetricFloat64Val{}
return nil
}

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
Expand All @@ -318,21 +349,12 @@ func (o *OtelMetrics) RecordScaledObjectPaused(namespace string, scaledObject st

opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("scaledObject").String(scaledObject),
)
attribute.Key("scaledObject").String(scaledObject))

cback := func(_ context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(float64(activeVal), opt)
return nil
}
_, err := meter.Float64ObservableGauge(
"keda.scaled.object.paused",
api.WithDescription("Indicates whether a ScaledObject is paused"),
api.WithFloat64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register scaled object paused metric", "namespace", namespace, "scaledObject", scaledObject)
}
otelScalerPause := OtelMetricFloat64Val{}
otelScalerPause.val = float64(activeVal)
otelScalerPause.measurementOption = opt
otelScalerPauseVals = append(otelScalerPauseVals, otelScalerPause)
}

// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA
Expand Down Expand Up @@ -448,10 +470,10 @@ func (o *OtelMetrics) RecordCloudEventEmittedError(namespace string, cloudevents
}

func CloudeventQueueStatusCallback(_ context.Context, obsrv api.Float64Observer) error {
if otCloudEventQueueStatusVal.measurementOption != nil {
obsrv.Observe(otCloudEventQueueStatusVal.val, otCloudEventQueueStatusVal.measurementOption)
for _, v := range otCloudEventQueueStatusVals {
obsrv.Observe(v.val, v.measurementOption)
}
otCloudEventQueueStatusVal = OtelMetricFloat64Val{}
otCloudEventQueueStatusVals = []OtelMetricFloat64Val{}
return nil
}

Expand All @@ -461,6 +483,8 @@ func (o *OtelMetrics) RecordCloudEventQueueStatus(namespace string, value int) {
attribute.Key("namespace").String(namespace),
)

otCloudEventQueueStatusVal.val = float64(value)
otCloudEventQueueStatusVal.measurementOption = opt
otCloudEventQueueStatus := OtelMetricFloat64Val{}
otCloudEventQueueStatus.val = float64(value)
otCloudEventQueueStatus.measurementOption = opt
otCloudEventQueueStatusVals = append(otCloudEventQueueStatusVals, otCloudEventQueueStatus)
}
53 changes: 53 additions & 0 deletions pkg/metricscollector/opentelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,56 @@ func TestLoopLatency(t *testing.T) {
data = latencySeconds.Data.(metricdata.Gauge[float64]).DataPoints[0]
assert.Equal(t, data.Value, float64(0.5))
}

func TestContinuousMetrics(t *testing.T) {
testOtel.RecordScalerActive("testnamespace", "testresource", "testscaler", 0, "testmetric", true, true)
testOtel.RecordScalerActive("testnamespace2", "testresource2", "testscaler2", 0, "testmetric", false, false)
got := metricdata.ResourceMetrics{}
err := testReader.Collect(context.Background(), &got)

assert.Nil(t, err)
scopeMetrics := got.ScopeMetrics[0]
assert.NotEqual(t, len(scopeMetrics.Metrics), 0)
activeMetric := retrieveMetric(scopeMetrics.Metrics, "keda.scaler.active")

assert.NotNil(t, buildInfo)

dataPoints := activeMetric.Data.(metricdata.Gauge[float64]).DataPoints
assert.Len(t, dataPoints, 2)

var scaledObjectMetric metricdata.DataPoint[float64]
for _, v := range dataPoints {
attribute, _ := v.Attributes.Value("namespace")
if attribute.AsString() == "testnamespace" {
scaledObjectMetric = v
}
}

assert.NotEqual(t, scaledObjectMetric, metricdata.DataPoint[float64]{})
attribute, _ := scaledObjectMetric.Attributes.Value("scaledObject")
assert.Equal(t, attribute.AsString(), "testresource")
attribute, _ = scaledObjectMetric.Attributes.Value("scaler")
assert.Equal(t, attribute.AsString(), "testscaler")
attribute, _ = scaledObjectMetric.Attributes.Value("metric")
assert.Equal(t, attribute.AsString(), "testmetric")
assert.Equal(t, scaledObjectMetric.Value, 1.0)

var scaledJobMetric metricdata.DataPoint[float64]
for _, v := range dataPoints {
attribute, _ := v.Attributes.Value("namespace")
if attribute.AsString() == "testnamespace2" {
scaledJobMetric = v
}
}

assert.NotEqual(t, scaledJobMetric, metricdata.DataPoint[float64]{})
attribute, _ = scaledJobMetric.Attributes.Value("namespace")
assert.Equal(t, attribute.AsString(), "testnamespace2")
attribute, _ = scaledJobMetric.Attributes.Value("scaledJob")
assert.Equal(t, attribute.AsString(), "testresource2")
attribute, _ = scaledJobMetric.Attributes.Value("scaler")
assert.Equal(t, attribute.AsString(), "testscaler2")
attribute, _ = scaledJobMetric.Attributes.Value("metric")
assert.Equal(t, attribute.AsString(), "testmetric")
assert.Equal(t, scaledJobMetric.Value, 0.0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func TestOpenTelemetryMetrics(t *testing.T) {

testScalerMetricValue(t)
testScalerMetricLatency(t)
testScalerActiveMetric(t)
testScalerActiveMetric(t, kc)
testScaledObjectErrors(t, data)
testScaledJobErrors(t, data)
testScalerErrors(t, data)
Expand Down Expand Up @@ -748,28 +748,20 @@ func testScalableObjectMetrics(t *testing.T) {
}
}

func testScalerActiveMetric(t *testing.T) {
func testScalerActiveMetric(t *testing.T, kc *kubernetes.Clientset) {
t.Log("--- testing scaler active metric ---")

family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
assertScaledObjectFlagMetric(t, families, scaledObjectName, "keda_scaler_active", true)

val, ok := family["keda_scaler_active"]
assert.True(t, ok, "keda_scaler_active not available")
if ok {
var found bool
metrics := val.GetMetric()
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) ||
(*label.Name == labelScaledJob && *label.Value == scaledJobName) {
assert.Equal(t, float64(1), *metric.Gauge.Value)
found = true
}
}
}
assert.Equal(t, true, found)
}
t.Log("--- testing scaler active metric scaled down ---")
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, 0, testNamespace)
WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 2)
time.Sleep(10 * time.Second)
families = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))

assertScaledObjectFlagMetric(t, families, scaledObjectName, "keda_scaler_active", false)
KubernetesScaleDeployment(t, kc, monitoredDeploymentName, 4, testNamespace)
}

func testScaledObjectPausedMetric(t *testing.T, data templateData) {
Expand All @@ -781,15 +773,15 @@ func testScaledObjectPausedMetric(t *testing.T, data templateData) {
time.Sleep(20 * time.Second)
// Check that the paused metric is now true
families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
assertScaledObjectPausedMetric(t, families, scaledObjectName, true)
assertScaledObjectFlagMetric(t, families, scaledObjectName, "keda_scaled_object_paused", true)

// Unpause the ScaledObject
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)

time.Sleep(20 * time.Second)
// Check that the paused metric is back to false
families = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
assertScaledObjectPausedMetric(t, families, scaledObjectName, false)
assertScaledObjectFlagMetric(t, families, scaledObjectName, "keda_scaled_object_paused", false)
}

func testOperatorMetrics(t *testing.T, kc *kubernetes.Clientset, data templateData) {
Expand Down Expand Up @@ -970,16 +962,17 @@ func checkCRTotalValues(t *testing.T, families map[string]*prommodel.MetricFamil
}
}

func assertScaledObjectPausedMetric(t *testing.T, families map[string]*prommodel.MetricFamily, scaledObjectName string, expected bool) {
family, ok := families["keda_scaled_object_paused"]
assert.True(t, ok, "keda_scaled_object_paused not available")
func assertScaledObjectFlagMetric(t *testing.T, families map[string]*prommodel.MetricFamily, scaledObjectName string, metricName string, expected bool) {
family, ok := families[metricName]
assert.True(t, ok, "%s not available", metricName)
if !ok {
return
}

metricValue := 0.0
metrics := family.GetMetric()
for _, metric := range metrics {
t.Log("scaledobject flag metric detail info ---", "metric", metric, "scaledObjectName", scaledObjectName, "metricName", metricName)
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
Expand Down

0 comments on commit 9a00d1e

Please sign in to comment.