Skip to content

Commit

Permalink
Internally represent value and threshold as int64 (#2801)
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
zroubalik authored Mar 23, 2022
1 parent f8f86bd commit 76527d5
Show file tree
Hide file tree
Showing 45 changed files with 227 additions and 228 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
- **General:** Fix mismatched errors for updating HPA ([#2719](https://github.com/kedacore/keda/issues/2719))
- **General:** Improve e2e tests reliability ([#2580](https://github.com/kedacore/keda/issues/2580))
- **General:** Improve e2e tests to always cleanup resources in cluster ([#2584](https://github.com/kedacore/keda/issues/2584))
- **General:** Internally represent value and threshold as int64 ([#2790](https://github.com/kedacore/keda/issues/2790))
- **GCP Pubsub Scaler:** Adding e2e test ([#1528](https://github.com/kedacore/keda/issues/1528))
- **Memory Scaler:** Adding e2e test ([#2220](https://github.com/kedacore/keda/issues/2220))

Expand Down
2 changes: 1 addition & 1 deletion CREATE-NEW-SCALER.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ For example:
>**Note:** There is a naming helper function `GenerateMetricNameWithIndex(scalerIndex int, metricName string)`, that receives the current index and the original metric name (without the prefix) and returns the concatenated string using the convention (please use this function).<br>Next lines are an example about how to use it:
>```golang
>func (s *artemisScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
> targetMetricValue := resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI)
> targetMetricValue := resource.NewQuantity(s.metadata.queueLength, resource.DecimalSI)
> externalMetric := &v2beta2.ExternalMetricSource{
> Metric: v2beta2.MetricIdentifier{
> Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "artemis", s.metadata.brokerName, s.metadata.queueName))),
Expand Down
14 changes: 7 additions & 7 deletions pkg/scalers/activemq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type activeMQMetadata struct {
username string
password string
restAPITemplate string
targetQueueSize int
targetQueueSize int64
metricName string
scalerIndex int
}
Expand Down Expand Up @@ -94,7 +94,7 @@ func parseActiveMQMetadata(config *ScalerConfig) (*activeMQMetadata, error) {
}

if val, ok := config.TriggerMetadata["targetQueueSize"]; ok {
queueSize, err := strconv.Atoi(val)
queueSize, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid targetQueueSize - must be an integer")
}
Expand Down Expand Up @@ -200,9 +200,9 @@ func (s *activeMQScaler) getMonitoringEndpoint() (string, error) {
return monitoringEndpoint, nil
}

func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int, error) {
func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int64, error) {
var monitoringInfo *activeMQMonitoring
var queueMessageCount int
var queueMessageCount int64

client := s.httpClient
url, err := s.getMonitoringEndpoint()
Expand Down Expand Up @@ -230,7 +230,7 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int, error)
return -1, err
}
if resp.StatusCode == 200 && monitoringInfo.Status == 200 {
queueMessageCount = monitoringInfo.MsgCount
queueMessageCount = int64(monitoringInfo.MsgCount)
} else {
return -1, fmt.Errorf("ActiveMQ management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status)
}
Expand All @@ -242,7 +242,7 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int, error)

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *activeMQScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.targetQueueSize), resource.DecimalSI)
targetMetricValue := resource.NewQuantity(s.metadata.targetQueueSize, resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
Expand All @@ -266,7 +266,7 @@ func (s *activeMQScaler) GetMetrics(ctx context.Context, metricName string, metr

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(queueSize), resource.DecimalSI),
Value: *resource.NewQuantity(queueSize, resource.DecimalSI),
Timestamp: metav1.Now(),
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/scalers/artemis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type artemisMetadata struct {
username string
password string
restAPITemplate string
queueLength int
queueLength int64
corsHeader string
scalerIndex int
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func parseArtemisMetadata(config *ScalerConfig) (*artemisMetadata, error) {
}

if val, ok := config.TriggerMetadata["queueLength"]; ok {
queueLength, err := strconv.Atoi(val)
queueLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("can't parse queueLength: %s", err)
}
Expand Down Expand Up @@ -214,9 +214,9 @@ func (s *artemisScaler) getMonitoringEndpoint() string {
return monitoringEndpoint
}

func (s *artemisScaler) getQueueMessageCount(ctx context.Context) (int, error) {
func (s *artemisScaler) getQueueMessageCount(ctx context.Context) (int64, error) {
var monitoringInfo *artemisMonitoring
messageCount := 0
var messageCount int64

client := s.httpClient
url := s.getMonitoringEndpoint()
Expand All @@ -240,7 +240,7 @@ func (s *artemisScaler) getQueueMessageCount(ctx context.Context) (int, error) {
return -1, err
}
if resp.StatusCode == 200 && monitoringInfo.Status == 200 {
messageCount = monitoringInfo.MsgCount
messageCount = int64(monitoringInfo.MsgCount)
} else {
return -1, fmt.Errorf("artemis management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status)
}
Expand All @@ -251,7 +251,7 @@ func (s *artemisScaler) getQueueMessageCount(ctx context.Context) (int, error) {
}

func (s *artemisScaler) GetMetricSpecForScaling(ctx context.Context) []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI)
targetMetricValue := resource.NewQuantity(s.metadata.queueLength, resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("artemis-%s", s.metadata.queueName))),
Expand All @@ -276,7 +276,7 @@ func (s *artemisScaler) GetMetrics(ctx context.Context, metricName string, metri

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(messages), resource.DecimalSI),
Value: *resource.NewQuantity(messages, resource.DecimalSI),
Timestamp: metav1.Now(),
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/scalers/aws_dynamodb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type awsDynamoDBMetadata struct {
keyConditionExpression string
expressionAttributeNames map[string]*string
expressionAttributeValues map[string]*dynamodb.AttributeValue
targetValue int
targetValue int64
awsAuthorization awsAuthorizationMetadata
scalerIndex int
metricName string
Expand Down Expand Up @@ -101,7 +101,7 @@ func parseAwsDynamoDBMetadata(config *ScalerConfig) (*awsDynamoDBMetadata, error
}

if val, ok := config.TriggerMetadata["targetValue"]; ok && val != "" {
n, err := strconv.Atoi(val)
n, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing metadata targetValue")
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func (c *awsDynamoDBScaler) GetMetrics(ctx context.Context, metricName string, m
}

func (c *awsDynamoDBScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(c.metadata.targetValue), resource.DecimalSI)
targetMetricValue := resource.NewQuantity(c.metadata.targetValue, resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: c.metadata.metricName,
Expand Down
6 changes: 3 additions & 3 deletions pkg/scalers/aws_kinesis_stream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type awsKinesisStreamScaler struct {
}

type awsKinesisStreamMetadata struct {
targetShardCount int
targetShardCount int64
streamName string
awsRegion string
awsAuthorization awsAuthorizationMetadata
Expand All @@ -59,7 +59,7 @@ func parseAwsKinesisStreamMetadata(config *ScalerConfig) (*awsKinesisStreamMetad
meta.targetShardCount = targetShardCountDefault

if val, ok := config.TriggerMetadata["shardCount"]; ok && val != "" {
shardCount, err := strconv.Atoi(val)
shardCount, err := strconv.ParseInt(val, 10, 64)
if err != nil {
meta.targetShardCount = targetShardCountDefault
kinesisStreamLog.Error(err, "Error parsing Kinesis stream metadata shardCount, using default %n", targetShardCountDefault)
Expand Down Expand Up @@ -133,7 +133,7 @@ func (s *awsKinesisStreamScaler) Close(context.Context) error {
}

func (s *awsKinesisStreamScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetShardCountQty := resource.NewQuantity(int64(s.metadata.targetShardCount), resource.DecimalSI)
targetShardCountQty := resource.NewQuantity(s.metadata.targetShardCount, resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-kinesis-%s", s.metadata.streamName))),
Expand Down
16 changes: 8 additions & 8 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type awsSqsQueueScaler struct {
}

type awsSqsQueueMetadata struct {
targetQueueLength int
targetQueueLength int64
queueURL string
queueName string
awsRegion string
Expand All @@ -68,7 +68,7 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig) (*awsSqsQueueMetadata, error
meta.targetQueueLength = defaultTargetQueueLength

if val, ok := config.TriggerMetadata["queueLength"]; ok && val != "" {
queueLength, err := strconv.Atoi(val)
queueLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
meta.targetQueueLength = targetQueueLengthDefault
sqsQueueLog.Error(err, "Error parsing SQS queue metadata queueLength, using default %n", targetQueueLengthDefault)
Expand Down Expand Up @@ -142,7 +142,7 @@ func createSqsClient(metadata *awsSqsQueueMetadata) *sqs.SQS {

// IsActive determines if we need to scale from zero
func (s *awsSqsQueueScaler) IsActive(ctx context.Context) (bool, error) {
length, err := s.GetAwsSqsQueueLength()
length, err := s.getAwsSqsQueueLength()

if err != nil {
return false, err
Expand All @@ -156,7 +156,7 @@ func (s *awsSqsQueueScaler) Close(context.Context) error {
}

func (s *awsSqsQueueScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetQueueLength), resource.DecimalSI)
targetQueueLengthQty := resource.NewQuantity(s.metadata.targetQueueLength, resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-sqs-%s", s.metadata.queueName))),
Expand All @@ -172,7 +172,7 @@ func (s *awsSqsQueueScaler) GetMetricSpecForScaling(context.Context) []v2beta2.M

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *awsSqsQueueScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
queuelen, err := s.GetAwsSqsQueueLength()
queuelen, err := s.getAwsSqsQueueLength()

if err != nil {
sqsQueueLog.Error(err, "Error getting queue length")
Expand All @@ -181,15 +181,15 @@ func (s *awsSqsQueueScaler) GetMetrics(ctx context.Context, metricName string, m

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(queuelen), resource.DecimalSI),
Value: *resource.NewQuantity(queuelen, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// Get SQS Queue Length
func (s *awsSqsQueueScaler) GetAwsSqsQueueLength() (int32, error) {
func (s *awsSqsQueueScaler) getAwsSqsQueueLength() (int64, error) {
input := &sqs.GetQueueAttributesInput{
AttributeNames: aws.StringSlice(awsSqsQueueMetricNames),
QueueUrl: aws.String(s.metadata.queueURL),
Expand All @@ -209,5 +209,5 @@ func (s *awsSqsQueueScaler) GetAwsSqsQueueLength() (int32, error) {
approximateNumberOfMessages += metricValue
}

return int32(approximateNumberOfMessages), nil
return approximateNumberOfMessages, nil
}
6 changes: 3 additions & 3 deletions pkg/scalers/azure/azure_app_insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func getAuthConfig(info AppInsightsInfo, podIdentity kedav1alpha1.PodIdentityPro
return config
}

func extractAppInsightValue(info AppInsightsInfo, metric ApplicationInsightsMetric) (int32, error) {
func extractAppInsightValue(info AppInsightsInfo, metric ApplicationInsightsMetric) (int64, error) {
if _, ok := metric.Value[info.MetricID]; !ok {
return -1, fmt.Errorf("metric named %s not found in app insights response", info.MetricID)
}
Expand All @@ -81,7 +81,7 @@ func extractAppInsightValue(info AppInsightsInfo, metric ApplicationInsightsMetr

azureAppInsightsLog.V(2).Info("value extracted from metric request", "metric type", info.AggregationType, "metric value", floatVal)

return int32(math.Round(floatVal)), nil
return int64(math.Round(floatVal)), nil
}

func queryParamsForAppInsightsRequest(info AppInsightsInfo) (map[string]interface{}, error) {
Expand All @@ -102,7 +102,7 @@ func queryParamsForAppInsightsRequest(info AppInsightsInfo) (map[string]interfac
}

// GetAzureAppInsightsMetricValue returns the value of an Azure App Insights metric, rounded to the nearest int
func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.PodIdentityProvider) (int32, error) {
func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.PodIdentityProvider) (int64, error) {
config := getAuthConfig(info, podIdentity)
authorizer, err := config.Authorizer()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure/azure_app_insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type testExtractAzAppInsightsTestData struct {
testName string
isError bool
expectedValue int32
expectedValue int64
info AppInsightsInfo
metricResult ApplicationInsightsMetric
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/scalers/azure/azure_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// GetAzureBlobListLength returns the count of the blobs in blob container in int
func GetAzureBlobListLength(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.PodIdentityProvider, connectionString, blobContainerName string, accountName string, blobDelimiter string, blobPrefix string, endpointSuffix string) (int, error) {
func GetAzureBlobListLength(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.PodIdentityProvider, connectionString, blobContainerName string, accountName string, blobDelimiter string, blobPrefix string, endpointSuffix string) (int64, error) {
credential, endpoint, err := ParseAzureStorageBlobConnection(ctx, httpClient, podIdentity, connectionString, accountName, endpointSuffix)
if err != nil {
return -1, err
Expand All @@ -44,5 +44,5 @@ func GetAzureBlobListLength(ctx context.Context, httpClient util.HTTPDoer, podId
return -1, err
}

return len(props.Segment.BlobItems), nil
return int64(len(props.Segment.BlobItems)), nil
}
6 changes: 3 additions & 3 deletions pkg/scalers/azure/azure_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type MonitorInfo struct {
var azureMonitorLog = logf.Log.WithName("azure_monitor_scaler")

// GetAzureMetricValue returns the value of an Azure Monitor metric, rounded to the nearest int
func GetAzureMetricValue(ctx context.Context, info MonitorInfo, podIdentity kedav1alpha1.PodIdentityProvider) (int32, error) {
func GetAzureMetricValue(ctx context.Context, info MonitorInfo, podIdentity kedav1alpha1.PodIdentityProvider) (int64, error) {
var podIdentityEnabled = true

if podIdentity == "" || podIdentity == kedav1alpha1.PodIdentityProviderNone {
Expand Down Expand Up @@ -121,14 +121,14 @@ func createMetricsRequest(info MonitorInfo) (*azureExternalMetricRequest, error)
return &metricRequest, nil
}

func executeRequest(ctx context.Context, client insights.MetricsClient, request *azureExternalMetricRequest) (int32, error) {
func executeRequest(ctx context.Context, client insights.MetricsClient, request *azureExternalMetricRequest) (int64, error) {
metricResponse, err := getAzureMetric(ctx, client, *request)
if err != nil {
return -1, fmt.Errorf("error getting azure monitor metric %s: %w", request.MetricName, err)
}

// casting drops everything after decimal, so round first
metricValue := int32(math.Round(metricResponse))
metricValue := int64(math.Round(metricResponse))

return metricValue, nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/scalers/azure/azure_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
)

// GetAzureQueueLength returns the length of a queue in int
func GetAzureQueueLength(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.PodIdentityProvider, connectionString, queueName, accountName, endpointSuffix string) (int32, error) {
func GetAzureQueueLength(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.PodIdentityProvider, connectionString, queueName, accountName, endpointSuffix string) (int64, error) {
credential, endpoint, err := ParseAzureStorageQueueConnection(ctx, httpClient, podIdentity, connectionString, accountName, endpointSuffix)
if err != nil {
return -1, err
Expand All @@ -46,7 +46,7 @@ func GetAzureQueueLength(ctx context.Context, httpClient util.HTTPDoer, podIdent
}

// Queue has less messages than we allowed to peek for, so no need to get the approximation
if visibleMessageCount < maxPeekMessages {
if visibleMessageCount < int64(maxPeekMessages) {
return visibleMessageCount, nil
}

Expand All @@ -55,15 +55,15 @@ func GetAzureQueueLength(ctx context.Context, httpClient util.HTTPDoer, podIdent
return -1, err
}

return props.ApproximateMessagesCount(), nil
return int64(props.ApproximateMessagesCount()), nil
}

func getVisibleCount(ctx context.Context, queueURL *azqueue.QueueURL, maxCount int32) (int32, error) {
func getVisibleCount(ctx context.Context, queueURL *azqueue.QueueURL, maxCount int32) (int64, error) {
messagesURL := queueURL.NewMessagesURL()
queue, err := messagesURL.Peek(ctx, maxCount)
if err != nil {
return 0, err
}
num := queue.NumMessages()
return num, nil
return int64(num), nil
}
Loading

0 comments on commit 76527d5

Please sign in to comment.