From 68cc5fb59276f69579a64d890d67bcd9347fa43f Mon Sep 17 00:00:00 2001 From: Jan Wozniak Date: Wed, 10 Apr 2024 14:04:35 +0200 Subject: [PATCH] reimplement prometheus config parsing Signed-off-by: Jan Wozniak --- pkg/scalers/prometheus_scaler.go | 144 ++++++-------------------- pkg/scalers/prometheus_scaler_test.go | 18 ++-- 2 files changed, 42 insertions(+), 120 deletions(-) diff --git a/pkg/scalers/prometheus_scaler.go b/pkg/scalers/prometheus_scaler.go index b46cf271501..a262994be57 100644 --- a/pkg/scalers/prometheus_scaler.go +++ b/pkg/scalers/prometheus_scaler.go @@ -49,22 +49,24 @@ type prometheusScaler struct { logger logr.Logger } +// sometimes should consider there is an error we can accept +// default value is true/t, to ignore the null value return from prometheus +// change to false/f if can not accept prometheus return null values +// https://github.com/kedacore/keda/issues/3065 type prometheusMetadata struct { - serverAddress string - query string - queryParameters map[string]string - threshold float64 - activationThreshold float64 - prometheusAuth *authentication.AuthMeta - namespace string - triggerIndex int - customHeaders map[string]string - // sometimes should consider there is an error we can accept - // default value is true/t, to ignore the null value return from prometheus - // change to false/f if can not accept prometheus return null values - // https://github.com/kedacore/keda/issues/3065 - ignoreNullValues bool - unsafeSsl bool + prometheusAuth *authentication.AuthMeta + triggerIndex int + + ServerAddress string `keda:"name=serverAddress, parsingOrder=triggerMetadata"` + Query string `keda:"name=query, parsingOrder=triggerMetadata"` + QueryParameters map[string]string `keda:"name=queryParameters, parsingOrder=triggerMetadata, optional"` + Threshold float64 `keda:"name=threshold, parsingOrder=triggerMetadata"` + ActivationThreshold float64 `keda:"name=activationThreshold, parsingOrder=triggerMetadata"` + Namespace string `keda:"name=namespace, parsingOrder=triggerMetadata, optional"` + CustomHeaders map[string]string `keda:"name=customHeaders, parsingOrder=triggerMetadata, optional"` + IgnoreNullValues bool `keda:"name=ignoreNullValues, parsingOrder=triggerMetadata, optional, default=true"` + UnsafeSSL bool `keda:"name=unsafeSsl, parsingOrder=triggerMetadata, optional"` + CortexOrgID string `keda:"name=cortexOrgID, parsingOrder=triggerMetadata, optional, deprecated=use customHeaders instead"` } type promQueryResult struct { @@ -93,7 +95,7 @@ func NewPrometheusScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { return nil, fmt.Errorf("error parsing prometheus metadata: %w", err) } - httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.unsafeSsl) + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.UnsafeSSL) if meta.prometheusAuth != nil { if meta.prometheusAuth.CA != "" || meta.prometheusAuth.EnableTLS { @@ -153,91 +155,11 @@ func NewPrometheusScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { func parsePrometheusMetadata(config *scalersconfig.ScalerConfig) (meta *prometheusMetadata, err error) { meta = &prometheusMetadata{} - - if val, ok := config.TriggerMetadata[promServerAddress]; ok && val != "" { - meta.serverAddress = val - } else { - return nil, fmt.Errorf("no %s given", promServerAddress) - } - - if val, ok := config.TriggerMetadata[promQuery]; ok && val != "" { - meta.query = val - } else { - return nil, fmt.Errorf("no %s given", promQuery) - } - - if val, ok := config.TriggerMetadata[promQueryParameters]; ok && val != "" { - queryParameters, err := kedautil.ParseStringList(val) - if err != nil { - return nil, fmt.Errorf("error parsing %s: %w", promQueryParameters, err) - } - - meta.queryParameters = queryParameters - } - - if val, ok := config.TriggerMetadata[promThreshold]; ok && val != "" { - t, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("error parsing %s: %w", promThreshold, err) - } - - meta.threshold = t - } else { - if config.AsMetricSource { - meta.threshold = 0 - } else { - return nil, fmt.Errorf("no %s given", promThreshold) - } - } - - meta.activationThreshold = 0 - if val, ok := config.TriggerMetadata[promActivationThreshold]; ok { - t, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("activationThreshold parsing error %w", err) - } - - meta.activationThreshold = t - } - - if val, ok := config.TriggerMetadata[promNamespace]; ok && val != "" { - meta.namespace = val - } - - if val, ok := config.TriggerMetadata[promCortexScopeOrgID]; ok && val != "" { - return nil, fmt.Errorf("cortexOrgID is deprecated, please use customHeaders instead") - } - - if val, ok := config.TriggerMetadata[promCustomHeaders]; ok && val != "" { - customHeaders, err := kedautil.ParseStringList(val) - if err != nil { - return nil, fmt.Errorf("error parsing %s: %w", promCustomHeaders, err) - } - - meta.customHeaders = customHeaders - } - - meta.ignoreNullValues = defaultIgnoreNullValues - if val, ok := config.TriggerMetadata[ignoreNullValues]; ok && val != "" { - ignoreNullValues, err := strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("err incorrect value for ignoreNullValues given: %s, please use true or false", val) - } - meta.ignoreNullValues = ignoreNullValues - } - - meta.unsafeSsl = false - if val, ok := config.TriggerMetadata[unsafeSsl]; ok && val != "" { - unsafeSslValue, err := strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("error parsing %s: %w", unsafeSsl, err) - } - - meta.unsafeSsl = unsafeSslValue + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing prometheus metadata: %w", err) } meta.triggerIndex = config.TriggerIndex - err = parseAuthConfig(config, meta) if err != nil { return nil, err @@ -274,7 +196,7 @@ func (s *prometheusScaler) GetMetricSpecForScaling(context.Context) []v2.MetricS Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.threshold), + Target: GetMetricTargetMili(s.metricType, s.metadata.Threshold), } metricSpec := v2.MetricSpec{ External: externalMetric, Type: externalMetricType, @@ -284,15 +206,15 @@ func (s *prometheusScaler) GetMetricSpecForScaling(context.Context) []v2.MetricS func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error) { t := time.Now().UTC().Format(time.RFC3339) - queryEscaped := url_pkg.QueryEscape(s.metadata.query) - url := fmt.Sprintf("%s/api/v1/query?query=%s&time=%s", s.metadata.serverAddress, queryEscaped, t) + queryEscaped := url_pkg.QueryEscape(s.metadata.Query) + url := fmt.Sprintf("%s/api/v1/query?query=%s&time=%s", s.metadata.ServerAddress, queryEscaped, t) // set 'namespace' parameter for namespaced Prometheus requests (e.g. for Thanos Querier) - if s.metadata.namespace != "" { - url = fmt.Sprintf("%s&namespace=%s", url, s.metadata.namespace) + if s.metadata.Namespace != "" { + url = fmt.Sprintf("%s&namespace=%s", url, s.metadata.Namespace) } - for queryParameterKey, queryParameterValue := range s.metadata.queryParameters { + for queryParameterKey, queryParameterValue := range s.metadata.QueryParameters { queryParameterKeyEscaped := url_pkg.QueryEscape(queryParameterKey) queryParameterValueEscaped := url_pkg.QueryEscape(queryParameterValue) url = fmt.Sprintf("%s&%s=%s", url, queryParameterKeyEscaped, queryParameterValueEscaped) @@ -303,7 +225,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error return -1, err } - for headerName, headerValue := range s.metadata.customHeaders { + for headerName, headerValue := range s.metadata.CustomHeaders { req.Header.Add(headerName, headerValue) } @@ -345,22 +267,22 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error // allow for zero element or single element result sets if len(result.Data.Result) == 0 { - if s.metadata.ignoreNullValues { + if s.metadata.IgnoreNullValues { return 0, nil } return -1, fmt.Errorf("prometheus metrics 'prometheus' target may be lost, the result is empty") } else if len(result.Data.Result) > 1 { - return -1, fmt.Errorf("prometheus query %s returned multiple elements", s.metadata.query) + return -1, fmt.Errorf("prometheus query %s returned multiple elements", s.metadata.Query) } valueLen := len(result.Data.Result[0].Value) if valueLen == 0 { - if s.metadata.ignoreNullValues { + if s.metadata.IgnoreNullValues { return 0, nil } return -1, fmt.Errorf("prometheus metrics 'prometheus' target may be lost, the value list is empty") } else if valueLen < 2 { - return -1, fmt.Errorf("prometheus query %s didn't return enough values", s.metadata.query) + return -1, fmt.Errorf("prometheus query %s didn't return enough values", s.metadata.Query) } val := result.Data.Result[0].Value[1] @@ -374,7 +296,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error } if math.IsInf(v, 0) { - if s.metadata.ignoreNullValues { + if s.metadata.IgnoreNullValues { return 0, nil } err := fmt.Errorf("promtheus query returns %f", v) @@ -394,5 +316,5 @@ func (s *prometheusScaler) GetMetricsAndActivity(ctx context.Context, metricName metric := GenerateMetricInMili(metricName, val) - return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.activationThreshold, nil + return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.ActivationThreshold, nil } diff --git a/pkg/scalers/prometheus_scaler_test.go b/pkg/scalers/prometheus_scaler_test.go index 844920f6d12..04dd524551e 100644 --- a/pkg/scalers/prometheus_scaler_test.go +++ b/pkg/scalers/prometheus_scaler_test.go @@ -317,9 +317,9 @@ func TestPrometheusScalerExecutePromQuery(t *testing.T) { scaler := prometheusScaler{ metadata: &prometheusMetadata{ - serverAddress: server.URL, - ignoreNullValues: testData.ignoreNullValues, - unsafeSsl: testData.unsafeSsl, + ServerAddress: server.URL, + IgnoreNullValues: testData.ignoreNullValues, + UnsafeSSL: testData.unsafeSsl, }, httpClient: http.DefaultClient, logger: logr.Discard(), @@ -366,9 +366,9 @@ func TestPrometheusScalerCustomHeaders(t *testing.T) { scaler := prometheusScaler{ metadata: &prometheusMetadata{ - serverAddress: server.URL, - customHeaders: customHeadersValue, - ignoreNullValues: testData.ignoreNullValues, + ServerAddress: server.URL, + CustomHeaders: customHeadersValue, + IgnoreNullValues: testData.ignoreNullValues, }, httpClient: http.DefaultClient, } @@ -410,9 +410,9 @@ func TestPrometheusScalerExecutePromQueryParameters(t *testing.T) { })) scaler := prometheusScaler{ metadata: &prometheusMetadata{ - serverAddress: server.URL, - queryParameters: queryParametersValue, - ignoreNullValues: testData.ignoreNullValues, + ServerAddress: server.URL, + QueryParameters: queryParametersValue, + IgnoreNullValues: testData.ignoreNullValues, }, httpClient: http.DefaultClient, }