Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): histogram metric for loop latency #5812

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
otelScalerMetricsLatencyVals []OtelMetricFloat64Val
otelScalerMetricsLatencyValDeprecated []OtelMetricFloat64Val
otelInternalLoopLatencyVals []OtelMetricFloat64Val
otelUnterlanLoopLatencyValHistogram []OtelMetricFloat64Val
otelInternalLoopLatencyValDeprecated []OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val

Expand Down Expand Up @@ -170,6 +171,7 @@ func initMeters() {
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.internal.scale.loop.latency.seconds",
api.WithDescription("Internal latency of ScaledObject/ScaledJob loop execution"),
Expand All @@ -180,6 +182,15 @@ func initMeters() {
otLog.Error(err, msg)
}

_, err = meter.Float64Histogram(
"keda.internal.scale.loop.latency.bucket",
api.WithDescription("Internal latency of ScaledObject/ScaledJob loop execution"),
api.WithUnit("s"),
)
if err != nil {
otLog.Error(err, msg)
}

JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
_, err = meter.Float64ObservableGauge(
"keda.scaler.active",
api.WithDescription("Indicates whether a scaler is active (1), or not (0)"),
Expand Down Expand Up @@ -324,6 +335,11 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string,
otelInternalLoopLatencyD.val = float64(value.Milliseconds())
otelInternalLoopLatencyD.measurementOption = opt
otelInternalLoopLatencyValDeprecated = append(otelInternalLoopLatencyValDeprecated, otelInternalLoopLatencyD)

otelInternalLoopLatencyHistogram := OtelMetricFloat64Val{}
otelInternalLoopLatencyHistogram.val = value.Seconds()
otelInternalLoopLatencyHistogram.measurementOption = opt
otelUnterlanLoopLatencyValHistogram = append(otelUnterlanLoopLatencyValHistogram, otelInternalLoopLatencyHistogram)
}

func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error {
Expand Down
12 changes: 12 additions & 0 deletions pkg/metricscollector/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,16 @@ var (
[]string{"namespace", "type", "resource"},
)

internalLoopLatencyHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "internal_scale_loop",
Name: "latency_seconds_bucket",
Help: "Total deviation (in seconds) between the expected execution time and the actual execution time for the scaling loop. Represented as a histogram.",
},
[]string{"namespace", "type", "resource"},
)

// Total emitted cloudevents.
cloudeventEmitted = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -242,6 +252,7 @@ func NewPromMetrics() *PromMetrics {
metrics.Registry.MustRegister(scalerMetricsLatency)
metrics.Registry.MustRegister(internalLoopLatencyDeprecated)
metrics.Registry.MustRegister(internalLoopLatency)
metrics.Registry.MustRegister(internalLoopLatencyHistogram)
metrics.Registry.MustRegister(scalerActive)
metrics.Registry.MustRegister(scalerErrorsDeprecated)
metrics.Registry.MustRegister(scalerErrors)
Expand Down Expand Up @@ -284,6 +295,7 @@ func (p *PromMetrics) RecordScalerLatency(namespace string, scaledResource strin
func (p *PromMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value time.Duration) {
internalLoopLatency.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(value.Seconds())
internalLoopLatencyDeprecated.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(float64(value.Milliseconds()))
internalLoopLatencyHistogram.WithLabelValues(namespace, getResourceType(isScaledObject), name).Observe(value.Seconds())
}

// RecordScalerActive create a measurement of the activity of the scaler
Expand Down
32 changes: 31 additions & 1 deletion tests/sequential/prometheus_metrics/prometheus_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
"k8s.io/client-go/kubernetes"

"github.com/kedacore/keda/v2/pkg/metricscollector"
. "github.com/kedacore/keda/v2/tests/helper"
)

const (
Expand Down Expand Up @@ -429,13 +428,13 @@
t.Log("--- setting up ---")

// Create kubernetes resources
kc := GetKubernetesClient(t)

Check failure on line 431 in tests/sequential/prometheus_metrics/prometheus_metrics_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

undefined: GetKubernetesClient
data, templates := getTemplateData()

CreateKubernetesResources(t, kc, testNamespace, data, templates)

Check failure on line 434 in tests/sequential/prometheus_metrics/prometheus_metrics_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

undefined: CreateKubernetesResources

// scaling to max replica count to ensure the counter is registered before we test it
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 60, 2),

Check failure on line 437 in tests/sequential/prometheus_metrics/prometheus_metrics_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

undefined: WaitForDeploymentReplicaReadyCount
"replica count should be 2 after 2 minute")

testScalerMetricValue(t)
Expand All @@ -453,10 +452,10 @@
testCloudEventEmitted(t, data)
testCloudEventEmittedError(t, data)
// cleanup
DeleteKubernetesResources(t, testNamespace, data, templates)

Check failure on line 455 in tests/sequential/prometheus_metrics/prometheus_metrics_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

undefined: DeleteKubernetesResources
}

func getTemplateData() (templateData, []Template) {

Check failure on line 458 in tests/sequential/prometheus_metrics/prometheus_metrics_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

undefined: Template
return templateData{
TestName: testName,
TestNamespace: testNamespace,
Expand All @@ -474,7 +473,7 @@
CloudEventHTTPReceiverName: cloudEventHTTPReceiverName,
CloudEventHTTPServiceName: cloudEventHTTPServiceName,
CloudEventHTTPServiceURL: cloudEventHTTPServiceURL,
}, []Template{

Check failure on line 476 in tests/sequential/prometheus_metrics/prometheus_metrics_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

undefined: Template
{Name: "deploymentTemplate", Config: deploymentTemplate},
{Name: "monitoredDeploymentTemplate", Config: monitoredDeploymentTemplate},
{Name: "scaledObjectTemplate", Config: scaledObjectTemplate},
Expand All @@ -487,7 +486,7 @@
}

func fetchAndParsePrometheusMetrics(t *testing.T, cmd string) map[string]*prommodel.MetricFamily {
out, _, err := ExecCommandOnSpecificPod(t, clientName, testNamespace, cmd)

Check failure on line 489 in tests/sequential/prometheus_metrics/prometheus_metrics_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

undefined: ExecCommandOnSpecificPod
assert.NoErrorf(t, err, "cannot execute command - %s", err)

parser := expfmt.TextParser{}
Expand Down Expand Up @@ -526,9 +525,9 @@
func testScaledObjectErrors(t *testing.T, data templateData) {
t.Log("--- testing scaled object errors ---")

KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)

Check failure on line 528 in tests/sequential/prometheus_metrics/prometheus_metrics_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

undefined: KubectlDeleteWithTemplate
time.Sleep(2 * time.Second)
KubectlApplyWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate)

Check failure on line 530 in tests/sequential/prometheus_metrics/prometheus_metrics_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

undefined: KubectlApplyWithTemplate

// wait for 2 seconds as pollinginterval is 2
time.Sleep(20 * time.Second)
Expand Down Expand Up @@ -560,7 +559,7 @@
}
}

KubectlDeleteWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate)

Check failure on line 562 in tests/sequential/prometheus_metrics/prometheus_metrics_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

undefined: KubectlDeleteWithTemplate
time.Sleep(2 * time.Second)
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
}
Expand Down Expand Up @@ -887,6 +886,37 @@
} else {
t.Errorf("scaledobject metric not available")
}
if val, ok := family["keda_internal_scale_loop_latency_seconds_bucket"]; ok {
var found bool
metrics := val.GetMetric()

// check scaledobject loop
found = false
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelType && *label.Value == "scaledobject" {
found = true
}
}
}
assert.Equal(t, true, found)

// check scaledjob loop
found = false
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelType && *label.Value == "scaledjob" {
found = true
}
}
}
assert.Equal(t, true, found)
} else {
t.Errorf("scaledobject metric not available")
}

}

func testScalerActiveMetric(t *testing.T) {
Expand Down
Loading