Skip to content

Commit

Permalink
reimplement prometheus config parsing
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Wozniak <wozniak.jan@gmail.com>
  • Loading branch information
wozniakjan committed Apr 30, 2024
1 parent 4797a83 commit 0cf6cd0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 124 deletions.
148 changes: 33 additions & 115 deletions pkg/scalers/prometheus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,31 @@ const (
unsafeSsl = "unsafeSsl"
)

var (
defaultIgnoreNullValues = true
)

type prometheusScaler struct {
metricType v2.MetricTargetType
metadata *prometheusMetadata
httpClient *http.Client
logger logr.Logger
}

// sometimes should consider there is an error we can accept
// default value is true/t, to ignore the null value return from prometheus
// change to false/f if can not accept prometheus return null values
// https://github.com/kedacore/keda/issues/3065
type prometheusMetadata struct {
serverAddress string
query string
queryParameters map[string]string
threshold float64
activationThreshold float64
prometheusAuth *authentication.AuthMeta
namespace string
triggerIndex int
customHeaders map[string]string
// sometimes should consider there is an error we can accept
// default value is true/t, to ignore the null value return from prometheus
// change to false/f if can not accept prometheus return null values
// https://github.com/kedacore/keda/issues/3065
ignoreNullValues bool
unsafeSsl bool
prometheusAuth *authentication.AuthMeta
triggerIndex int

ServerAddress string `keda:"name=serverAddress, parsingOrder=triggerMetadata"`
Query string `keda:"name=query, parsingOrder=triggerMetadata"`
QueryParameters map[string]string `keda:"name=queryParameters, parsingOrder=triggerMetadata, optional"`
Threshold float64 `keda:"name=threshold, parsingOrder=triggerMetadata"`
ActivationThreshold float64 `keda:"name=activationThreshold, parsingOrder=triggerMetadata, optional"`
Namespace string `keda:"name=namespace, parsingOrder=triggerMetadata, optional"`
CustomHeaders map[string]string `keda:"name=customHeaders, parsingOrder=triggerMetadata, optional"`
IgnoreNullValues bool `keda:"name=ignoreNullValues, parsingOrder=triggerMetadata, optional, default=true"`
UnsafeSSL bool `keda:"name=unsafeSsl, parsingOrder=triggerMetadata, optional"`
CortexOrgID string `keda:"name=cortexOrgID, parsingOrder=triggerMetadata, optional, deprecated=use customHeaders instead"`
}

type promQueryResult struct {
Expand Down Expand Up @@ -93,7 +91,7 @@ func NewPrometheusScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
return nil, fmt.Errorf("error parsing prometheus metadata: %w", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.unsafeSsl)
httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.UnsafeSSL)

if meta.prometheusAuth != nil {
if meta.prometheusAuth.CA != "" || meta.prometheusAuth.EnableTLS {
Expand Down Expand Up @@ -153,91 +151,11 @@ func NewPrometheusScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

func parsePrometheusMetadata(config *scalersconfig.ScalerConfig) (meta *prometheusMetadata, err error) {
meta = &prometheusMetadata{}

if val, ok := config.TriggerMetadata[promServerAddress]; ok && val != "" {
meta.serverAddress = val
} else {
return nil, fmt.Errorf("no %s given", promServerAddress)
}

if val, ok := config.TriggerMetadata[promQuery]; ok && val != "" {
meta.query = val
} else {
return nil, fmt.Errorf("no %s given", promQuery)
}

if val, ok := config.TriggerMetadata[promQueryParameters]; ok && val != "" {
queryParameters, err := kedautil.ParseStringList(val)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %w", promQueryParameters, err)
}

meta.queryParameters = queryParameters
}

if val, ok := config.TriggerMetadata[promThreshold]; ok && val != "" {
t, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %w", promThreshold, err)
}

meta.threshold = t
} else {
if config.AsMetricSource {
meta.threshold = 0
} else {
return nil, fmt.Errorf("no %s given", promThreshold)
}
}

meta.activationThreshold = 0
if val, ok := config.TriggerMetadata[promActivationThreshold]; ok {
t, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationThreshold parsing error %w", err)
}

meta.activationThreshold = t
}

if val, ok := config.TriggerMetadata[promNamespace]; ok && val != "" {
meta.namespace = val
}

if val, ok := config.TriggerMetadata[promCortexScopeOrgID]; ok && val != "" {
return nil, fmt.Errorf("cortexOrgID is deprecated, please use customHeaders instead")
}

if val, ok := config.TriggerMetadata[promCustomHeaders]; ok && val != "" {
customHeaders, err := kedautil.ParseStringList(val)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %w", promCustomHeaders, err)
}

meta.customHeaders = customHeaders
}

meta.ignoreNullValues = defaultIgnoreNullValues
if val, ok := config.TriggerMetadata[ignoreNullValues]; ok && val != "" {
ignoreNullValues, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("err incorrect value for ignoreNullValues given: %s, please use true or false", val)
}
meta.ignoreNullValues = ignoreNullValues
}

meta.unsafeSsl = false
if val, ok := config.TriggerMetadata[unsafeSsl]; ok && val != "" {
unsafeSslValue, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %w", unsafeSsl, err)
}

meta.unsafeSsl = unsafeSslValue
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing prometheus metadata: %w", err)
}

meta.triggerIndex = config.TriggerIndex

err = parseAuthConfig(config, meta)
if err != nil {
return nil, err
Expand Down Expand Up @@ -274,7 +192,7 @@ func (s *prometheusScaler) GetMetricSpecForScaling(context.Context) []v2.MetricS
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.threshold),
Target: GetMetricTargetMili(s.metricType, s.metadata.Threshold),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
Expand All @@ -284,15 +202,15 @@ func (s *prometheusScaler) GetMetricSpecForScaling(context.Context) []v2.MetricS

func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error) {
t := time.Now().UTC().Format(time.RFC3339)
queryEscaped := url_pkg.QueryEscape(s.metadata.query)
url := fmt.Sprintf("%s/api/v1/query?query=%s&time=%s", s.metadata.serverAddress, queryEscaped, t)
queryEscaped := url_pkg.QueryEscape(s.metadata.Query)
url := fmt.Sprintf("%s/api/v1/query?query=%s&time=%s", s.metadata.ServerAddress, queryEscaped, t)

// set 'namespace' parameter for namespaced Prometheus requests (e.g. for Thanos Querier)
if s.metadata.namespace != "" {
url = fmt.Sprintf("%s&namespace=%s", url, s.metadata.namespace)
if s.metadata.Namespace != "" {
url = fmt.Sprintf("%s&namespace=%s", url, s.metadata.Namespace)
}

for queryParameterKey, queryParameterValue := range s.metadata.queryParameters {
for queryParameterKey, queryParameterValue := range s.metadata.QueryParameters {
queryParameterKeyEscaped := url_pkg.QueryEscape(queryParameterKey)
queryParameterValueEscaped := url_pkg.QueryEscape(queryParameterValue)
url = fmt.Sprintf("%s&%s=%s", url, queryParameterKeyEscaped, queryParameterValueEscaped)
Expand All @@ -303,7 +221,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error
return -1, err
}

for headerName, headerValue := range s.metadata.customHeaders {
for headerName, headerValue := range s.metadata.CustomHeaders {
req.Header.Add(headerName, headerValue)
}

Expand Down Expand Up @@ -345,22 +263,22 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error

// allow for zero element or single element result sets
if len(result.Data.Result) == 0 {
if s.metadata.ignoreNullValues {
if s.metadata.IgnoreNullValues {
return 0, nil
}
return -1, fmt.Errorf("prometheus metrics 'prometheus' target may be lost, the result is empty")
} else if len(result.Data.Result) > 1 {
return -1, fmt.Errorf("prometheus query %s returned multiple elements", s.metadata.query)
return -1, fmt.Errorf("prometheus query %s returned multiple elements", s.metadata.Query)
}

valueLen := len(result.Data.Result[0].Value)
if valueLen == 0 {
if s.metadata.ignoreNullValues {
if s.metadata.IgnoreNullValues {
return 0, nil
}
return -1, fmt.Errorf("prometheus metrics 'prometheus' target may be lost, the value list is empty")
} else if valueLen < 2 {
return -1, fmt.Errorf("prometheus query %s didn't return enough values", s.metadata.query)
return -1, fmt.Errorf("prometheus query %s didn't return enough values", s.metadata.Query)
}

val := result.Data.Result[0].Value[1]
Expand All @@ -374,7 +292,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error
}

if math.IsInf(v, 0) {
if s.metadata.ignoreNullValues {
if s.metadata.IgnoreNullValues {
return 0, nil
}
err := fmt.Errorf("promtheus query returns %f", v)
Expand All @@ -394,5 +312,5 @@ func (s *prometheusScaler) GetMetricsAndActivity(ctx context.Context, metricName

metric := GenerateMetricInMili(metricName, val)

return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.activationThreshold, nil
return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.ActivationThreshold, nil
}
18 changes: 9 additions & 9 deletions pkg/scalers/prometheus_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ func TestPrometheusScalerExecutePromQuery(t *testing.T) {

scaler := prometheusScaler{
metadata: &prometheusMetadata{
serverAddress: server.URL,
ignoreNullValues: testData.ignoreNullValues,
unsafeSsl: testData.unsafeSsl,
ServerAddress: server.URL,
IgnoreNullValues: testData.ignoreNullValues,
UnsafeSSL: testData.unsafeSsl,
},
httpClient: http.DefaultClient,
logger: logr.Discard(),
Expand Down Expand Up @@ -366,9 +366,9 @@ func TestPrometheusScalerCustomHeaders(t *testing.T) {

scaler := prometheusScaler{
metadata: &prometheusMetadata{
serverAddress: server.URL,
customHeaders: customHeadersValue,
ignoreNullValues: testData.ignoreNullValues,
ServerAddress: server.URL,
CustomHeaders: customHeadersValue,
IgnoreNullValues: testData.ignoreNullValues,
},
httpClient: http.DefaultClient,
}
Expand Down Expand Up @@ -410,9 +410,9 @@ func TestPrometheusScalerExecutePromQueryParameters(t *testing.T) {
}))
scaler := prometheusScaler{
metadata: &prometheusMetadata{
serverAddress: server.URL,
queryParameters: queryParametersValue,
ignoreNullValues: testData.ignoreNullValues,
ServerAddress: server.URL,
QueryParameters: queryParametersValue,
IgnoreNullValues: testData.ignoreNullValues,
},
httpClient: http.DefaultClient,
}
Expand Down

0 comments on commit 0cf6cd0

Please sign in to comment.