Skip to content

Commit

Permalink
feat: Use mili scale to support more precision (kedacore#3125)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorge Turrado Ferrero authored and aviadlevy committed Jun 16, 2022
1 parent 3ef9f29 commit 1c57a93
Show file tree
Hide file tree
Showing 58 changed files with 255 additions and 507 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ cover.out

# GO debug binary
cmd/manager/debug.test

# GO Test result
report.xml
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md

### Improvements

- **General:** Use `mili` scale for the returned metrics ([#3135](https://github.com/kedacore/keda/issue/3135))
- **General:** Use more readable timestamps in KEDA Operator logs ([#3066](https://github.com/kedacore/keda/issue/3066))
- **Prometheus Scaler:** Add ignoreNullValues to return error when prometheus return null in values ([#3065](https://github.com/kedacore/keda/issues/3065))
- **Selenium Grid Scaler:** Edge active sessions not being properly counted ([#2709](https://github.com/kedacore/keda/issues/2709))
Expand Down
8 changes: 1 addition & 7 deletions pkg/scalers/activemq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"text/template"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -275,11 +273,7 @@ func (s *activeMQScaler) GetMetrics(ctx context.Context, metricName string, metr
return nil, fmt.Errorf("error inspecting ActiveMQ queue size: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(queueSize, resource.DecimalSI),
Timestamp: metav1.Now(),
}
metric := GenerateMetricInMili(metricName, float64(queueSize))

return []external_metrics.ExternalMetricValue{metric}, nil
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/scalers/artemis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"strings"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -277,11 +275,7 @@ func (s *artemisScaler) GetMetrics(ctx context.Context, metricName string, metri
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(messages, resource.DecimalSI),
Timestamp: metav1.Now(),
}
metric := GenerateMetricInMili(metricName, float64(messages))

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Expand Down
40 changes: 25 additions & 15 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -43,8 +41,8 @@ type awsCloudwatchMetadata struct {
dimensionValue []string
expression string

targetMetricValue int64
minMetricValue int64
targetMetricValue float64
minMetricValue float64

metricCollectionTime int64
metricStat string
Expand Down Expand Up @@ -96,6 +94,22 @@ func getIntMetadataValue(metadata map[string]string, key string, required bool,
return defaultValue, nil
}

func getFloatMetadataValue(metadata map[string]string, key string, required bool, defaultValue float64) (float64, error) {
if val, ok := metadata[key]; ok && val != "" {
value, err := strconv.ParseFloat(val, 64)
if err != nil {
return 0, fmt.Errorf("error parsing %s metadata: %v", key, err)
}
return value, nil
}

if required {
return 0, fmt.Errorf("metadata %s not given", key)
}

return defaultValue, nil
}

func createCloudwatchClient(metadata *awsCloudwatchMetadata) *cloudwatch.CloudWatch {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(metadata.awsRegion),
Expand Down Expand Up @@ -167,12 +181,12 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e
}
}

meta.targetMetricValue, err = getIntMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0)
meta.targetMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0)
if err != nil {
return nil, err
}

meta.minMetricValue, err = getIntMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0)
meta.minMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -278,11 +292,7 @@ func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string,
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(metricValue, resource.DecimalSI),
Timestamp: metav1.Now(),
}
metric := GenerateMetricInMili(metricName, metricValue)

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Expand All @@ -300,7 +310,7 @@ func (c *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2beta2
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(c.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-cloudwatch-%s", metricNameSuffix))),
},
Target: GetMetricTarget(c.metricType, c.metadata.targetMetricValue),
Target: GetMetricTargetMili(c.metricType, c.metadata.targetMetricValue),
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta2.MetricSpec{metricSpec}
Expand All @@ -320,7 +330,7 @@ func (c *awsCloudwatchScaler) Close(context.Context) error {
return nil
}

func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (int64, error) {
func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
var input cloudwatch.GetMetricDataInput

startTime, endTime := computeQueryWindow(time.Now(), c.metadata.metricStatPeriod, c.metadata.metricEndTimeOffset, c.metadata.metricCollectionTime)
Expand Down Expand Up @@ -384,9 +394,9 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (int64, error) {
}

cloudwatchLog.V(1).Info("Received Metric Data", "data", output)
var metricValue int64
var metricValue float64
if len(output.MetricDataResults) > 0 && len(output.MetricDataResults[0].Values) > 0 {
metricValue = int64(*output.MetricDataResults[0].Values[0])
metricValue = *output.MetricDataResults[0].Values[0]
} else {
cloudwatchLog.Info("empty metric data received, returning minMetricValue")
metricValue = c.metadata.minMetricValue
Expand Down
12 changes: 3 additions & 9 deletions pkg/scalers/aws_dynamodb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"go.mongodb.org/mongo-driver/bson"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -168,11 +166,7 @@ func (c *awsDynamoDBScaler) GetMetrics(ctx context.Context, metricName string, m
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(metricValue, resource.DecimalSI),
Timestamp: metav1.Now(),
}
metric := GenerateMetricInMili(metricName, metricValue)

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Expand Down Expand Up @@ -204,7 +198,7 @@ func (c *awsDynamoDBScaler) Close(context.Context) error {
return nil
}

func (c *awsDynamoDBScaler) GetQueryMetrics() (int64, error) {
func (c *awsDynamoDBScaler) GetQueryMetrics() (float64, error) {
dimensions := dynamodb.QueryInput{
TableName: aws.String(c.metadata.tableName),
KeyConditionExpression: aws.String(c.metadata.keyConditionExpression),
Expand All @@ -218,7 +212,7 @@ func (c *awsDynamoDBScaler) GetQueryMetrics() (int64, error) {
return 0, err
}

return *res.Count, nil
return float64(*res.Count), nil
}

// json2Map convert Json to map[string]string
Expand Down
8 changes: 1 addition & 7 deletions pkg/scalers/aws_kinesis_stream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -158,11 +156,7 @@ func (s *awsKinesisStreamScaler) GetMetrics(ctx context.Context, metricName stri
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(shardCount, resource.DecimalSI),
Timestamp: metav1.Now(),
}
metric := GenerateMetricInMili(metricName, float64(shardCount))

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -181,11 +179,7 @@ func (s *awsSqsQueueScaler) GetMetrics(ctx context.Context, metricName string, m
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(queuelen, resource.DecimalSI),
Timestamp: metav1.Now(),
}
metric := GenerateMetricInMili(metricName, float64(queuelen))

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/scalers/azure/azure_app_insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package azure
import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -77,7 +76,7 @@ func getAuthConfig(ctx context.Context, info AppInsightsInfo, podIdentity kedav1
return nil
}

func extractAppInsightValue(info AppInsightsInfo, metric ApplicationInsightsMetric) (int64, error) {
func extractAppInsightValue(info AppInsightsInfo, metric ApplicationInsightsMetric) (float64, error) {
if _, ok := metric.Value[info.MetricID]; !ok {
return -1, fmt.Errorf("metric named %s not found in app insights response", info.MetricID)
}
Expand All @@ -94,7 +93,7 @@ func extractAppInsightValue(info AppInsightsInfo, metric ApplicationInsightsMetr

azureAppInsightsLog.V(2).Info("value extracted from metric request", "metric type", info.AggregationType, "metric value", floatVal)

return int64(math.Round(floatVal)), nil
return floatVal, nil
}

func queryParamsForAppInsightsRequest(info AppInsightsInfo) (map[string]interface{}, error) {
Expand All @@ -115,7 +114,7 @@ func queryParamsForAppInsightsRequest(info AppInsightsInfo) (map[string]interfac
}

// GetAzureAppInsightsMetricValue returns the value of an Azure App Insights metric, rounded to the nearest int
func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.PodIdentityProvider) (int64, error) {
func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.PodIdentityProvider) (float64, error) {
config := getAuthConfig(ctx, info, podIdentity)
authorizer, err := config.Authorizer()
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/scalers/azure/azure_app_insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
type testExtractAzAppInsightsTestData struct {
testName string
isError bool
expectedValue int64
expectedValue float64
info AppInsightsInfo
metricResult ApplicationInsightsMetric
}
Expand Down Expand Up @@ -48,8 +48,6 @@ var testExtractAzAppInsightsData = []testExtractAzAppInsightsTestData{
{"metric not found", true, -1, mockAppInsightsInfo("avg"), mockAppInsightsMetric("test/test", "avg", newMetricValue(0.0))},
{"metric is nil", true, -1, mockAppInsightsInfo("avg"), mockAppInsightsMetric("testns/test", "avg", nil)},
{"incorrect aggregation type", true, -1, mockAppInsightsInfo("avg"), mockAppInsightsMetric("testns/test", "max", newMetricValue(0.0))},
{"success round down value", false, 5, mockAppInsightsInfo("max"), mockAppInsightsMetric("testns/test", "max", newMetricValue(5.2))},
{"success round up value", false, 6, mockAppInsightsInfo("max"), mockAppInsightsMetric("testns/test", "max", newMetricValue(5.5))},
}

func TestAzGetAzureAppInsightsMetricValue(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/scalers/azure/azure_data_explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type DataExplorerMetadata struct {
PodIdentity kedav1alpha1.PodIdentityProvider
Query string
TenantID string
Threshold int64
Threshold float64
ActiveDirectoryEndpoint string
}

Expand Down Expand Up @@ -95,7 +95,7 @@ func getDataExplorerAuthConfig(ctx context.Context, metadata *DataExplorerMetada
return nil, fmt.Errorf("missing credentials. please reconfigure your scaled object metadata")
}

func GetAzureDataExplorerMetricValue(ctx context.Context, client *kusto.Client, db string, query string) (int64, error) {
func GetAzureDataExplorerMetricValue(ctx context.Context, client *kusto.Client, db string, query string) (float64, error) {
azureDataExplorerLogger.V(1).Info("Querying Azure Data Explorer", "db", db, "query", query)

iter, err := client.Query(ctx, db, kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: false})).UnsafeAdd(query))
Expand Down Expand Up @@ -130,7 +130,7 @@ func GetAzureDataExplorerMetricValue(ctx context.Context, client *kusto.Client,
return metricValue, nil
}

func extractDataExplorerMetricValue(row *table.Row) (int64, error) {
func extractDataExplorerMetricValue(row *table.Row) (float64, error) {
if row == nil || len(row.ColumnTypes) == 0 {
return -1, fmt.Errorf("query has no results")
}
Expand All @@ -141,14 +141,14 @@ func extractDataExplorerMetricValue(row *table.Row) (int64, error) {
return -1, fmt.Errorf("data type %s is not valid", dataType)
}

value, err := strconv.Atoi(row.Values[0].String())
value, err := strconv.ParseFloat(row.Values[0].String(), 64)
if err != nil {
return -1, fmt.Errorf("failed to convert result %s to int", row.Values[0].String())
}
if value < 0 {
return -1, fmt.Errorf("query result must be >= 0 but received: %d", value)
return -1, fmt.Errorf("query result must be >= 0 but received: %f", value)
}

azureDataExplorerLogger.V(1).Info("Query Result", "value", value, "dataType", dataType)
return int64(value), nil
return value, nil
}
10 changes: 3 additions & 7 deletions pkg/scalers/azure/azure_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package azure
import (
"context"
"fmt"
"math"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -67,7 +66,7 @@ type MonitorInfo struct {
var azureMonitorLog = logf.Log.WithName("azure_monitor_scaler")

// GetAzureMetricValue returns the value of an Azure Monitor metric, rounded to the nearest int
func GetAzureMetricValue(ctx context.Context, info MonitorInfo, podIdentity kedav1alpha1.PodIdentityProvider) (int64, error) {
func GetAzureMetricValue(ctx context.Context, info MonitorInfo, podIdentity kedav1alpha1.PodIdentityProvider) (float64, error) {
client := createMetricsClient(ctx, info, podIdentity)
requestPtr, err := createMetricsRequest(info)
if err != nil {
Expand Down Expand Up @@ -128,16 +127,13 @@ func createMetricsRequest(info MonitorInfo) (*azureExternalMetricRequest, error)
return &metricRequest, nil
}

func executeRequest(ctx context.Context, client insights.MetricsClient, request *azureExternalMetricRequest) (int64, error) {
func executeRequest(ctx context.Context, client insights.MetricsClient, request *azureExternalMetricRequest) (float64, error) {
metricResponse, err := getAzureMetric(ctx, client, *request)
if err != nil {
return -1, fmt.Errorf("error getting azure monitor metric %s: %w", request.MetricName, err)
}

// casting drops everything after decimal, so round first
metricValue := int64(math.Round(metricResponse))

return metricValue, nil
return metricResponse, nil
}

func getAzureMetric(ctx context.Context, client insights.MetricsClient, azMetricRequest azureExternalMetricRequest) (float64, error) {
Expand Down
Loading

0 comments on commit 1c57a93

Please sign in to comment.