Skip to content

Commit

Permalink
Add scaler metric latency support (kedacore#4040)
Browse files Browse the repository at this point in the history
Signed-off-by: kevin <tengkang@msn.com>
  • Loading branch information
kevinteng525 authored and JorTurFer committed Jan 9, 2023
1 parent bb9b083 commit 0e3e3d6
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

Here is an overview of all **stable** additions:

- **Prometheus Metrics**: Introduce scaler latency in Prometheus metrics. ([#4037](https://github.com/kedacore/keda/issues/4037))
- **General**: Introduce new ArangoDB Scaler ([#4000](https://github.com/kedacore/keda/issues/4000))

Here is an overview of all new **experimental** features:
Expand Down
17 changes: 16 additions & 1 deletion pkg/prommetrics/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ var (
},
metricLabels,
)
scalerMetricsLatency = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "scaler",
Name: "metrics_latency",
Help: "Scaler Metrics Latency",
},
metricLabels,
)
scalerErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: DefaultPromMetricsNamespace,
Expand All @@ -67,7 +76,7 @@ var (
scaledObjectErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "scaled",
Subsystem: "scaled_object",
Name: "errors",
Help: "Number of scaled object errors",
},
Expand Down Expand Up @@ -96,6 +105,7 @@ var (
func init() {
metrics.Registry.MustRegister(scalerErrorsTotal)
metrics.Registry.MustRegister(scalerMetricsValue)
metrics.Registry.MustRegister(scalerMetricsLatency)
metrics.Registry.MustRegister(scalerErrors)
metrics.Registry.MustRegister(scaledObjectErrors)

Expand All @@ -108,6 +118,11 @@ func RecordScalerMetric(namespace string, scaledObject string, scaler string, sc
scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value)
}

// RecordScalerLatency create a measurement of the latency to external metric
func RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value)
}

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) {
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string,
}
// Filter only the desired metric
if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) {
metrics, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric)
metrics, _, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric)
metrics, err = fallback.GetMetricsWithFallback(ctx, p.client, logger, metrics, err, info.Metric, scaledObject, metricSpec)
if err != nil {
scalerError = true
Expand Down
16 changes: 9 additions & 7 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"time"

v2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -69,24 +70,25 @@ func (c *ScalersCache) GetPushScalers() []scalers.PushScaler {
return result
}

// GetMetricsForScaler returns metric value for a scaler identified by the metric name
// GetMetricsForScaler returns metric value and latency for a scaler identified by the metric name
// and by the input index (from the list of scalers in this ScaledObject)
func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, error) {
func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, int64, error) {
if index < 0 || index >= len(c.Scalers) {
return nil, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
return nil, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
}
startTime := time.Now()
m, _, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName)
if err == nil {
return m, nil
return m, time.Since(startTime).Milliseconds(), nil
}

ns, err := c.refreshScaler(ctx, index)
if err != nil {
return nil, err
return nil, -1, err
}

startTime = time.Now()
m, _, err = ns.GetMetricsAndActivity(ctx, metricName)
return m, err
return m, time.Since(startTime).Milliseconds(), err
}

// GetScaledObjectState returns whether the input ScaledObject is active as a first parameters,
Expand Down
6 changes: 5 additions & 1 deletion pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,11 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
}

if !metricsFoundInCache {
metrics, err = cache.GetMetricsForScaler(ctx, scalerIndex, metricName)
var latency int64
metrics, latency, err = cache.GetMetricsForScaler(ctx, scalerIndex, metricName)
if latency != -1 {
prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency))
}
h.logger.V(1).Info("Getting metrics from scaler", "scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName, "scaler", scalerName, "metricName", metricSpec.External.Metric.Name, "metrics", metrics, "scalerError", err)
}
metrics, err = fallback.GetMetricsWithFallback(ctx, h.client, h.logger, metrics, err, metricName, scaledObject, metricSpec)
Expand Down
31 changes: 28 additions & 3 deletions tests/internals/prometheus_metrics/prometheus_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
)

const (
testName = "prometheus-metrics-test"
testName = "prometheus-metrics-test"
labelScaledObject = "scaledObject"
)

var (
Expand Down Expand Up @@ -225,6 +226,7 @@ func TestScaler(t *testing.T) {
"replica count should be 2 after 2 minute")

testScalerMetricValue(t)
testScalerMetricLatency(t)
testMetricsServerScalerMetricValue(t)
testOperatorMetrics(t, kc, data)

Expand Down Expand Up @@ -274,7 +276,7 @@ func testScalerMetricValue(t *testing.T) {
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == "scaledObject" && *label.Value == scaledObjectName {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
assert.Equal(t, float64(4), *metric.Gauge.Value)
found = true
}
Expand All @@ -286,6 +288,29 @@ func testScalerMetricValue(t *testing.T) {
}
}

func testScalerMetricLatency(t *testing.T) {
t.Log("--- testing scaler metric latency ---")

family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL))

if val, ok := family["keda_scaler_metrics_latency"]; ok {
var found bool
metrics := val.GetMetric()
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
assert.Equal(t, float64(0), *metric.Gauge.Value)
found = true
}
}
}
assert.Equal(t, true, found)
} else {
t.Errorf("metric not available")
}
}

// [DEPRECATED] handle exporting Prometheus metrics from Operator to Metrics Server
func testMetricsServerScalerMetricValue(t *testing.T) {
t.Log("--- testing scaler metric value in metrics server ---")
Expand All @@ -298,7 +323,7 @@ func testMetricsServerScalerMetricValue(t *testing.T) {
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == "scaledObject" && *label.Value == scaledObjectName {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
assert.Equal(t, float64(4), *metric.Gauge.Value)
found = true
}
Expand Down

0 comments on commit 0e3e3d6

Please sign in to comment.