Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use mili scale to support more precision #3125

Merged
merged 17 commits into from
Jun 14, 2022
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ cover.out

# GO debug binary
cmd/manager/debug.test

# GO Test result
report.xml
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md

### Improvements

- **General:** Use `mili` scale for the returned metrics ([#3135](https://github.com/kedacore/keda/issue/3135))
- **General:** Use more readable timestamps in KEDA Operator logs ([#3066](https://github.com/kedacore/keda/issue/3066))
- **Prometheus Scaler:** Add ignoreNullValues to return error when prometheus return null in values ([#3065](https://github.com/kedacore/keda/issues/3065))
- **Selenium Grid Scaler:** Edge active sessions not being properly counted ([#2709](https://github.com/kedacore/keda/issues/2709))
Expand Down
8 changes: 1 addition & 7 deletions pkg/scalers/activemq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"text/template"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -275,11 +273,7 @@ func (s *activeMQScaler) GetMetrics(ctx context.Context, metricName string, metr
return nil, fmt.Errorf("error inspecting ActiveMQ queue size: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(queueSize, resource.DecimalSI),
Timestamp: metav1.Now(),
}
metric := GenerateMetricInMili(metricName, float64(queueSize))

return []external_metrics.ExternalMetricValue{metric}, nil
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/scalers/artemis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"strings"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -277,11 +275,7 @@ func (s *artemisScaler) GetMetrics(ctx context.Context, metricName string, metri
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(messages, resource.DecimalSI),
Timestamp: metav1.Now(),
}
metric := GenerateMetricInMili(metricName, float64(messages))

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Expand Down
40 changes: 25 additions & 15 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -43,8 +41,8 @@ type awsCloudwatchMetadata struct {
dimensionValue []string
expression string

targetMetricValue int64
minMetricValue int64
targetMetricValue float64
minMetricValue float64

metricCollectionTime int64
metricStat string
Expand Down Expand Up @@ -96,6 +94,22 @@ func getIntMetadataValue(metadata map[string]string, key string, required bool,
return defaultValue, nil
}

func getFloatMetadataValue(metadata map[string]string, key string, required bool, defaultValue float64) (float64, error) {
if val, ok := metadata[key]; ok && val != "" {
value, err := strconv.ParseFloat(val, 64)
if err != nil {
return 0, fmt.Errorf("error parsing %s metadata: %v", key, err)
}
return value, nil
}

if required {
return 0, fmt.Errorf("metadata %s not given", key)
}

return defaultValue, nil
}

func createCloudwatchClient(metadata *awsCloudwatchMetadata) *cloudwatch.CloudWatch {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(metadata.awsRegion),
Expand Down Expand Up @@ -167,12 +181,12 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e
}
}

meta.targetMetricValue, err = getIntMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0)
meta.targetMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0)
if err != nil {
return nil, err
}

meta.minMetricValue, err = getIntMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0)
meta.minMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -278,11 +292,7 @@ func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string,
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(metricValue, resource.DecimalSI),
Timestamp: metav1.Now(),
}
metric := GenerateMetricInMili(metricName, metricValue)

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Expand All @@ -300,7 +310,7 @@ func (c *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2beta2
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(c.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-cloudwatch-%s", metricNameSuffix))),
},
Target: GetMetricTarget(c.metricType, c.metadata.targetMetricValue),
Target: GetMetricTargetMili(c.metricType, c.metadata.targetMetricValue),
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta2.MetricSpec{metricSpec}
Expand All @@ -320,7 +330,7 @@ func (c *awsCloudwatchScaler) Close(context.Context) error {
return nil
}

func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (int64, error) {
func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
var input cloudwatch.GetMetricDataInput

startTime, endTime := computeQueryWindow(time.Now(), c.metadata.metricStatPeriod, c.metadata.metricEndTimeOffset, c.metadata.metricCollectionTime)
Expand Down Expand Up @@ -384,9 +394,9 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (int64, error) {
}

cloudwatchLog.V(1).Info("Received Metric Data", "data", output)
var metricValue int64
var metricValue float64
if len(output.MetricDataResults) > 0 && len(output.MetricDataResults[0].Values) > 0 {
metricValue = int64(*output.MetricDataResults[0].Values[0])
metricValue = *output.MetricDataResults[0].Values[0]
} else {
cloudwatchLog.Info("empty metric data received, returning minMetricValue")
metricValue = c.metadata.minMetricValue
Expand Down
12 changes: 3 additions & 9 deletions pkg/scalers/aws_dynamodb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"go.mongodb.org/mongo-driver/bson"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -168,11 +166,7 @@ func (c *awsDynamoDBScaler) GetMetrics(ctx context.Context, metricName string, m
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(metricValue, resource.DecimalSI),
Timestamp: metav1.Now(),
}
metric := GenerateMetricInMili(metricName, metricValue)

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Expand Down Expand Up @@ -204,7 +198,7 @@ func (c *awsDynamoDBScaler) Close(context.Context) error {
return nil
}

func (c *awsDynamoDBScaler) GetQueryMetrics() (int64, error) {
func (c *awsDynamoDBScaler) GetQueryMetrics() (float64, error) {
dimensions := dynamodb.QueryInput{
TableName: aws.String(c.metadata.tableName),
KeyConditionExpression: aws.String(c.metadata.keyConditionExpression),
Expand All @@ -218,7 +212,7 @@ func (c *awsDynamoDBScaler) GetQueryMetrics() (int64, error) {
return 0, err
}

return *res.Count, nil
return float64(*res.Count), nil
}

// json2Map convert Json to map[string]string
Expand Down
8 changes: 1 addition & 7 deletions pkg/scalers/aws_kinesis_stream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -158,11 +156,7 @@ func (s *awsKinesisStreamScaler) GetMetrics(ctx context.Context, metricName stri
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(shardCount, resource.DecimalSI),
Timestamp: metav1.Now(),
}
metric := GenerateMetricInMili(metricName, float64(shardCount))

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -181,11 +179,7 @@ func (s *awsSqsQueueScaler) GetMetrics(ctx context.Context, metricName string, m
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(queuelen, resource.DecimalSI),
Timestamp: metav1.Now(),
}
metric := GenerateMetricInMili(metricName, float64(queuelen))

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/scalers/azure/azure_app_insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package azure
import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -77,7 +76,7 @@ func getAuthConfig(ctx context.Context, info AppInsightsInfo, podIdentity kedav1
return nil
}

func extractAppInsightValue(info AppInsightsInfo, metric ApplicationInsightsMetric) (int64, error) {
func extractAppInsightValue(info AppInsightsInfo, metric ApplicationInsightsMetric) (float64, error) {
if _, ok := metric.Value[info.MetricID]; !ok {
return -1, fmt.Errorf("metric named %s not found in app insights response", info.MetricID)
}
Expand All @@ -94,7 +93,7 @@ func extractAppInsightValue(info AppInsightsInfo, metric ApplicationInsightsMetr

azureAppInsightsLog.V(2).Info("value extracted from metric request", "metric type", info.AggregationType, "metric value", floatVal)

return int64(math.Round(floatVal)), nil
return floatVal, nil
}

func queryParamsForAppInsightsRequest(info AppInsightsInfo) (map[string]interface{}, error) {
Expand All @@ -115,7 +114,7 @@ func queryParamsForAppInsightsRequest(info AppInsightsInfo) (map[string]interfac
}

// GetAzureAppInsightsMetricValue returns the value of an Azure App Insights metric, rounded to the nearest int
func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.PodIdentityProvider) (int64, error) {
func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.PodIdentityProvider) (float64, error) {
config := getAuthConfig(ctx, info, podIdentity)
authorizer, err := config.Authorizer()
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/scalers/azure/azure_app_insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
type testExtractAzAppInsightsTestData struct {
testName string
isError bool
expectedValue int64
expectedValue float64
info AppInsightsInfo
metricResult ApplicationInsightsMetric
}
Expand Down Expand Up @@ -48,8 +48,6 @@ var testExtractAzAppInsightsData = []testExtractAzAppInsightsTestData{
{"metric not found", true, -1, mockAppInsightsInfo("avg"), mockAppInsightsMetric("test/test", "avg", newMetricValue(0.0))},
{"metric is nil", true, -1, mockAppInsightsInfo("avg"), mockAppInsightsMetric("testns/test", "avg", nil)},
{"incorrect aggregation type", true, -1, mockAppInsightsInfo("avg"), mockAppInsightsMetric("testns/test", "max", newMetricValue(0.0))},
{"success round down value", false, 5, mockAppInsightsInfo("max"), mockAppInsightsMetric("testns/test", "max", newMetricValue(5.2))},
{"success round up value", false, 6, mockAppInsightsInfo("max"), mockAppInsightsMetric("testns/test", "max", newMetricValue(5.5))},
}

func TestAzGetAzureAppInsightsMetricValue(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/scalers/azure/azure_data_explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type DataExplorerMetadata struct {
PodIdentity kedav1alpha1.PodIdentityProvider
Query string
TenantID string
Threshold int64
Threshold float64
ActiveDirectoryEndpoint string
}

Expand Down Expand Up @@ -95,7 +95,7 @@ func getDataExplorerAuthConfig(ctx context.Context, metadata *DataExplorerMetada
return nil, fmt.Errorf("missing credentials. please reconfigure your scaled object metadata")
}

func GetAzureDataExplorerMetricValue(ctx context.Context, client *kusto.Client, db string, query string) (int64, error) {
func GetAzureDataExplorerMetricValue(ctx context.Context, client *kusto.Client, db string, query string) (float64, error) {
azureDataExplorerLogger.V(1).Info("Querying Azure Data Explorer", "db", db, "query", query)

iter, err := client.Query(ctx, db, kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: false})).UnsafeAdd(query))
Expand Down Expand Up @@ -130,7 +130,7 @@ func GetAzureDataExplorerMetricValue(ctx context.Context, client *kusto.Client,
return metricValue, nil
}

func extractDataExplorerMetricValue(row *table.Row) (int64, error) {
func extractDataExplorerMetricValue(row *table.Row) (float64, error) {
if row == nil || len(row.ColumnTypes) == 0 {
return -1, fmt.Errorf("query has no results")
}
Expand All @@ -141,14 +141,14 @@ func extractDataExplorerMetricValue(row *table.Row) (int64, error) {
return -1, fmt.Errorf("data type %s is not valid", dataType)
}

value, err := strconv.Atoi(row.Values[0].String())
value, err := strconv.ParseFloat(row.Values[0].String(), 64)
if err != nil {
return -1, fmt.Errorf("failed to convert result %s to int", row.Values[0].String())
}
if value < 0 {
return -1, fmt.Errorf("query result must be >= 0 but received: %d", value)
return -1, fmt.Errorf("query result must be >= 0 but received: %f", value)
}

azureDataExplorerLogger.V(1).Info("Query Result", "value", value, "dataType", dataType)
return int64(value), nil
return value, nil
}
10 changes: 3 additions & 7 deletions pkg/scalers/azure/azure_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package azure
import (
"context"
"fmt"
"math"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -67,7 +66,7 @@ type MonitorInfo struct {
var azureMonitorLog = logf.Log.WithName("azure_monitor_scaler")

// GetAzureMetricValue returns the value of an Azure Monitor metric, rounded to the nearest int
func GetAzureMetricValue(ctx context.Context, info MonitorInfo, podIdentity kedav1alpha1.PodIdentityProvider) (int64, error) {
func GetAzureMetricValue(ctx context.Context, info MonitorInfo, podIdentity kedav1alpha1.PodIdentityProvider) (float64, error) {
client := createMetricsClient(ctx, info, podIdentity)
requestPtr, err := createMetricsRequest(info)
if err != nil {
Expand Down Expand Up @@ -128,16 +127,13 @@ func createMetricsRequest(info MonitorInfo) (*azureExternalMetricRequest, error)
return &metricRequest, nil
}

func executeRequest(ctx context.Context, client insights.MetricsClient, request *azureExternalMetricRequest) (int64, error) {
func executeRequest(ctx context.Context, client insights.MetricsClient, request *azureExternalMetricRequest) (float64, error) {
metricResponse, err := getAzureMetric(ctx, client, *request)
if err != nil {
return -1, fmt.Errorf("error getting azure monitor metric %s: %w", request.MetricName, err)
}

// casting drops everything after decimal, so round first
metricValue := int64(math.Round(metricResponse))

return metricValue, nil
return metricResponse, nil
}

func getAzureMetric(ctx context.Context, client insights.MetricsClient, azMetricRequest azureExternalMetricRequest) (float64, error) {
Expand Down
Loading