Skip to content

Commit

Permalink
move the logic for measuring latency inside cache.GetMetricsForScaler()
Browse files Browse the repository at this point in the history
move the logic for measuring latency inside cache.GetMetricsForScaler()

Signed-off-by: kevin <tengkang@msn.com>
  • Loading branch information
kevinteng525 committed Jan 6, 2023
1 parent fad3925 commit c62d21a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string,
}
// Filter only the desired metric
if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) {
metrics, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric)
metrics, _, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric)
metrics, err = fallback.GetMetricsWithFallback(ctx, p.client, logger, metrics, err, info.Metric, scaledObject, metricSpec)
if err != nil {
scalerError = true
Expand Down
16 changes: 9 additions & 7 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"time"

v2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -69,24 +70,25 @@ func (c *ScalersCache) GetPushScalers() []scalers.PushScaler {
return result
}

// GetMetricsForScaler returns metric value for a scaler identified by the metric name
// GetMetricsForScaler returns metric value and latency for a scaler identified by the metric name
// and by the input index (from the list of scalers in this ScaledObject)
func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, error) {
func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, int64, error) {
if index < 0 || index >= len(c.Scalers) {
return nil, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
return nil, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
}
startTime := time.Now()
m, _, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName)
if err == nil {
return m, nil
return m, time.Since(startTime).Milliseconds(), nil
}

ns, err := c.refreshScaler(ctx, index)
if err != nil {
return nil, err
return nil, -1, err
}

startTime = time.Now()
m, _, err = ns.GetMetricsAndActivity(ctx, metricName)
return m, err
return m, time.Since(startTime).Milliseconds(), err
}

// GetScaledObjectState returns whether the input ScaledObject is active as a first parameters,
Expand Down
8 changes: 4 additions & 4 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,10 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
}

if !metricsFoundInCache {
startTime := time.Now()
metrics, err = cache.GetMetricsForScaler(ctx, scalerIndex, metricName)
scalerLatency := time.Since(startTime).Milliseconds()
prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(scalerLatency))
metrics, latency, err := cache.GetMetricsForScaler(ctx, scalerIndex, metricName)
if latency != -1 {
prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency))
}
h.logger.V(1).Info("Getting metrics from scaler", "scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName, "scaler", scalerName, "metricName", metricSpec.External.Metric.Name, "metrics", metrics, "scalerError", err)
}
metrics, err = fallback.GetMetricsWithFallback(ctx, h.client, h.logger, metrics, err, metricName, scaledObject, metricSpec)
Expand Down

0 comments on commit c62d21a

Please sign in to comment.