From 98a2f5cde2c1b147ce5900a79e5cd70c6ddd0233 Mon Sep 17 00:00:00 2001 From: alex60217101990 Date: Tue, 11 Jan 2022 13:54:43 +0200 Subject: [PATCH 1/7] add auth logic for prometheus and predictkube scalers Signed-off-by: alex60217101990 --- go.mod | 3 +- go.sum | 2 + .../authentication/authentication_helpers.go | 158 +++++++++++ .../authentication/authentication_types.go | 34 +++ .../authentication/transporttype_enum.go | 260 ++++++++++++++++++ pkg/scalers/gcp_pub_sub_scaler.go | 4 +- pkg/scalers/predictkube_scaler.go | 102 +++---- pkg/scalers/prometheus_scaler.go | 115 ++------ pkg/scalers/prometheus_scaler_test.go | 6 +- 9 files changed, 540 insertions(+), 144 deletions(-) create mode 100644 pkg/scalers/authentication/authentication_helpers.go create mode 100644 pkg/scalers/authentication/transporttype_enum.go diff --git a/go.mod b/go.mod index 4f9e53a47d4..c87385cb397 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/Shopify/sarama v1.30.0 github.com/aws/aws-sdk-go v1.42.16 github.com/denisenkom/go-mssqldb v0.11.0 - github.com/dysnix/predictkube-libs v0.0.0-20211223143509-07a69ffd545e + github.com/dysnix/predictkube-libs v0.0.0-20220110175435-6a14c5918e22 github.com/dysnix/predictkube-proto v0.0.0-20211223141524-d309509b6b5f github.com/elastic/go-elasticsearch/v7 v7.15.1 github.com/go-logr/logr v0.4.0 @@ -179,6 +179,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/oklog/run v1.0.0 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect diff --git a/go.sum b/go.sum index 2e4fc63bfe8..2070cffab7f 100644 --- a/go.sum +++ b/go.sum @@ -256,6 +256,8 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dysnix/predictkube-libs v0.0.0-20211223143509-07a69ffd545e h1:4IEmV5r8U2RcvYcxT9SLpEUEyzTAbPE6if3z9q7aWZo= github.com/dysnix/predictkube-libs v0.0.0-20211223143509-07a69ffd545e/go.mod h1:dGl9trkmU8Cvh2ClgG68P8hBZRDoHmEynXt745J/T4U= +github.com/dysnix/predictkube-libs v0.0.0-20220110175435-6a14c5918e22 h1:NjwMoRvZMK2xf6Yh+sASF3oICkqSZh+bJf75DuBslZ4= +github.com/dysnix/predictkube-libs v0.0.0-20220110175435-6a14c5918e22/go.mod h1:WrLfDUxV7bb1OiF6LFeXPO45FlPcHdG7LIQov/JPR2E= github.com/dysnix/predictkube-proto v0.0.0-20211223141524-d309509b6b5f h1:56GoyLUD9Z3+Ko0iC8hGPq2RPvjceQEdbio78i5mhvQ= github.com/dysnix/predictkube-proto v0.0.0-20211223141524-d309509b6b5f/go.mod h1:zTsQdEyzxs3OHHtrjf8WpmexujIMTYyCVz/38VCt0uo= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= diff --git a/pkg/scalers/authentication/authentication_helpers.go b/pkg/scalers/authentication/authentication_helpers.go new file mode 100644 index 00000000000..bb5b9af2988 --- /dev/null +++ b/pkg/scalers/authentication/authentication_helpers.go @@ -0,0 +1,158 @@ +package authentication + +import ( + "crypto/tls" + "errors" + "fmt" + "net" + "net/http" + "strings" + "time" + + pConfig "github.com/prometheus/common/config" + + libs "github.com/dysnix/predictkube-libs/external/configs" + "github.com/dysnix/predictkube-libs/external/http_transport" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +const ( + authModesKey = "authModes" +) + +func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMeta, err error) { + authModes, ok := triggerMetadata[authModesKey] + // no authMode specified + if !ok { + return nil, nil + } + + authTypes := strings.Split(authModes, ",") + for _, t := range authTypes { + authType := Type(strings.TrimSpace(t)) + + out = &AuthMeta{} + + switch authType { + case BearerAuthType: + if len(authParams["bearerToken"]) == 0 { + return nil, errors.New("no bearer token provided") + } + if out.EnableBasicAuth { + return nil, errors.New("beare and basic authentication can not be set both") + } + + out.BearerToken = authParams["bearerToken"] + out.EnableBearerAuth = true + case BasicAuthType: + if len(authParams["username"]) == 0 { + return nil, errors.New("no username given") + } + if out.EnableBearerAuth { + return nil, errors.New("beare and basic authentication can not be set both") + } + + out.Username = authParams["username"] + // password is optional. For convenience, many application implement basic auth with + // username as apikey and password as empty + out.Password = authParams["password"] + out.EnableBasicAuth = true + case TLSAuthType: + if len(authParams["cert"]) == 0 { + return nil, errors.New("no cert given") + } + out.Cert = authParams["cert"] + + if len(authParams["key"]) == 0 { + return nil, errors.New("no key given") + } + + out.Key = authParams["key"] + out.EnableTLS = true + default: + return nil, fmt.Errorf("err incorrect value for authMode is given: %s", t) + } + } + + if len(authParams["ca"]) > 0 { + out.CA = authParams["ca"] + } + + return out, err +} + +func CreateHTTPRoundTripper(roundTripperType TransportType, auth *AuthMeta, conf ...*HTTPTransport) (rt http.RoundTripper, err error) { + tlsConfig := &tls.Config{InsecureSkipVerify: false} + if auth != nil && (auth.CA != "" || auth.EnableTLS) { + tlsConfig, err = kedautil.NewTLSConfig( + auth.Cert, + auth.Key, + auth.CA, + ) + if err != nil || tlsConfig == nil { + return nil, fmt.Errorf("error creating the TLS config: %s", err) + } + } + + switch roundTripperType { + case NetHTTP: + // from official github.com/prometheus/client_golang/api package + return &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: tlsConfig, + }, nil + case FastHTTP: + // default configs + httpConf := &libs.HTTPTransport{ + MaxIdleConnDuration: 10, + ReadTimeout: time.Second * 15, + WriteTimeout: time.Second * 15, + } + + if len(conf) > 0 { + httpConf = &libs.HTTPTransport{ + MaxIdleConnDuration: conf[0].MaxIdleConnDuration, + ReadTimeout: conf[0].ReadTimeout, + WriteTimeout: conf[0].WriteTimeout, + } + } + + var roundTripper http.RoundTripper + if roundTripper, err = http_transport.NewHttpTransport( + libs.SetTransportConfigs(httpConf), + libs.SetTLS(tlsConfig), + ); err != nil { + return nil, fmt.Errorf("error creating fast http round tripper: %s", err) + } + + if auth != nil { + if auth.EnableBasicAuth { + rt = pConfig.NewBasicAuthRoundTripper( + auth.Username, + pConfig.Secret(auth.Password), + "", roundTripper, + ) + } + + if auth.EnableBearerAuth { + rt = pConfig.NewAuthorizationCredentialsRoundTripper( + "Bearer", + pConfig.Secret(auth.BearerToken), + roundTripper, + ) + } + } else { + rt = roundTripper + } + + return rt, nil + } + + return rt, nil +} diff --git a/pkg/scalers/authentication/authentication_types.go b/pkg/scalers/authentication/authentication_types.go index 6764af42a9a..a69d2b3a016 100644 --- a/pkg/scalers/authentication/authentication_types.go +++ b/pkg/scalers/authentication/authentication_types.go @@ -1,5 +1,7 @@ package authentication +import "time" + // Type describes the authentication type used in a scaler type Type string @@ -13,3 +15,35 @@ const ( // BearerAuthType is a auth type using a bearer token BearerAuthType Type = "bearer" ) + +//go:generate go-enum -type=TransportType -transform=lower +// TransportType is type of http transport +type TransportType int + +const ( + NetHTTP TransportType = iota // NetHTTP standard Go net/http client. + FastHTTP // FastHTTP Fast http client. +) + +type AuthMeta struct { + // bearer auth + EnableBearerAuth bool + BearerToken string + + // basic auth + EnableBasicAuth bool + Username string + Password string // +optional + + // client certification + EnableTLS bool + Cert string + Key string + CA string +} + +type HTTPTransport struct { + MaxIdleConnDuration time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration +} diff --git a/pkg/scalers/authentication/transporttype_enum.go b/pkg/scalers/authentication/transporttype_enum.go new file mode 100644 index 00000000000..ef75ee1ca21 --- /dev/null +++ b/pkg/scalers/authentication/transporttype_enum.go @@ -0,0 +1,260 @@ +// Code generated by "go-enum -type=TransportType -transform=lower"; DO NOT EDIT. + +package authentication + +import ( + "database/sql" + "database/sql/driver" + "encoding" + "encoding/json" + "fmt" + "strconv" +) + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[NetHTTP-0] + _ = x[FastHTTP-1] +} + +const _TransportType_name = "nethttpfasthttp" + +var _TransportType_index = [...]uint8{0, 7, 15} + +func _() { + var _nil_TransportType_value = func() (val TransportType) { return }() + + // An "cannot convert TransportType literal (type TransportType) to type fmt.Stringer" compiler error signifies that the base type have changed. + // Re-run the go-enum command to generate them again. + var _ fmt.Stringer = _nil_TransportType_value +} + +func (i TransportType) String() string { + if i < 0 || i >= TransportType(len(_TransportType_index)-1) { + return "TransportType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _TransportType_name[_TransportType_index[i]:_TransportType_index[i+1]] +} + +// New returns a pointer to a new addr filled with the TransportType value passed in. +func (i TransportType) New() *TransportType { + clone := i + return &clone +} + +var _TransportType_values = []TransportType{0, 1} + +var _TransportType_name_to_values = map[string]TransportType{ + _TransportType_name[0:7]: 0, + _TransportType_name[7:15]: 1, +} + +// ParseTransportTypeString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func ParseTransportTypeString(s string) (TransportType, error) { + if val, ok := _TransportType_name_to_values[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to TransportType values", s) +} + +// TransportTypeValues returns all values of the enum +func TransportTypeValues() []TransportType { + return _TransportType_values +} + +// IsATransportType returns "true" if the value is listed in the enum definition. "false" otherwise +func (i TransportType) Registered() bool { + for _, v := range _TransportType_values { + if i == v { + return true + } + } + return false +} + +func _() { + var _nil_TransportType_value = func() (val TransportType) { return }() + + // An "cannot convert TransportType literal (type TransportType) to type encoding.BinaryMarshaler" compiler error signifies that the base type have changed. + // Re-run the go-enum command to generate them again. + var _ encoding.BinaryMarshaler = &_nil_TransportType_value + + // An "cannot convert TransportType literal (type TransportType) to type encoding.BinaryUnmarshaler" compiler error signifies that the base type have changed. + // Re-run the go-enum command to generate them again. + var _ encoding.BinaryUnmarshaler = &_nil_TransportType_value +} + +// MarshalBinary implements the encoding.BinaryMarshaler interface for TransportType +func (i TransportType) MarshalBinary() (data []byte, err error) { + return []byte(i.String()), nil +} + +// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface for TransportType +func (i *TransportType) UnmarshalBinary(data []byte) error { + var err error + *i, err = ParseTransportTypeString(string(data)) + return err +} + +func _() { + var _nil_TransportType_value = func() (val TransportType) { return }() + + // An "cannot convert TransportType literal (type TransportType) to type json.Marshaler" compiler error signifies that the base type have changed. + // Re-run the go-enum command to generate them again. + var _ json.Marshaler = _nil_TransportType_value + + // An "cannot convert TransportType literal (type TransportType) to type encoding.Unmarshaler" compiler error signifies that the base type have changed. + // Re-run the go-enum command to generate them again. + var _ json.Unmarshaler = &_nil_TransportType_value +} + +// MarshalJSON implements the json.Marshaler interface for TransportType +func (i TransportType) MarshalJSON() ([]byte, error) { + return json.Marshal(i.String()) +} + +// UnmarshalJSON implements the json.Unmarshaler interface for TransportType +func (i *TransportType) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return fmt.Errorf("TransportType should be a string, got %s", data) + } + + var err error + *i, err = ParseTransportTypeString(s) + return err +} + +func _() { + var _nil_TransportType_value = func() (val TransportType) { return }() + + // An "cannot convert TransportType literal (type TransportType) to type encoding.TextMarshaler" compiler error signifies that the base type have changed. + // Re-run the go-enum command to generate them again. + var _ encoding.TextMarshaler = _nil_TransportType_value + + // An "cannot convert TransportType literal (type TransportType) to type encoding.TextUnmarshaler" compiler error signifies that the base type have changed. + // Re-run the go-enum command to generate them again. + var _ encoding.TextUnmarshaler = &_nil_TransportType_value +} + +// MarshalText implements the encoding.TextMarshaler interface for TransportType +func (i TransportType) MarshalText() ([]byte, error) { + return []byte(i.String()), nil +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface for TransportType +func (i *TransportType) UnmarshalText(text []byte) error { + var err error + *i, err = ParseTransportTypeString(string(text)) + return err +} + +//func _() { +// var _nil_TransportType_value = func() (val TransportType) { return }() +// +// // An "cannot convert TransportType literal (type TransportType) to type yaml.Marshaler" compiler error signifies that the base type have changed. +// // Re-run the go-enum command to generate them again. +// var _ yaml.Marshaler = _nil_TransportType_value +// +// // An "cannot convert TransportType literal (type TransportType) to type yaml.Unmarshaler" compiler error signifies that the base type have changed. +// // Re-run the go-enum command to generate them again. +// var _ yaml.Unmarshaler = &_nil_TransportType_value +//} + +// MarshalYAML implements a YAML Marshaler for TransportType +func (i TransportType) MarshalYAML() (interface{}, error) { + return i.String(), nil +} + +// UnmarshalYAML implements a YAML Unmarshaler for TransportType +func (i *TransportType) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + + var err error + *i, err = ParseTransportTypeString(s) + return err +} + +func _() { + var _nil_TransportType_value = func() (val TransportType) { return }() + + // An "cannot convert TransportType literal (type TransportType) to type driver.Valuer" compiler error signifies that the base type have changed. + // Re-run the go-enum command to generate them again. + var _ driver.Valuer = _nil_TransportType_value + + // An "cannot convert TransportType literal (type TransportType) to type sql.Scanner" compiler error signifies that the base type have changed. + // Re-run the go-enum command to generate them again. + var _ sql.Scanner = &_nil_TransportType_value +} + +func (i TransportType) Value() (driver.Value, error) { + return i.String(), nil +} + +func (i *TransportType) Scan(value interface{}) error { + if value == nil { + return nil + } + + str, ok := value.(string) + if !ok { + bytes, ok := value.([]byte) + if !ok { + return fmt.Errorf("value is not a byte slice") + } + + str = string(bytes[:]) + } + + val, err := ParseTransportTypeString(str) + if err != nil { + return err + } + + *i = val + return nil +} + +// TransportTypeSliceContains reports whether sunEnums is within enums. +func TransportTypeSliceContains(enums []TransportType, sunEnums ...TransportType) bool { + var seenEnums = map[TransportType]bool{} + for _, e := range sunEnums { + seenEnums[e] = false + } + + for _, v := range enums { + if _, has := seenEnums[v]; has { + seenEnums[v] = true + } + } + + for _, seen := range seenEnums { + if !seen { + return false + } + } + + return true +} + +// TransportTypeSliceContainsAny reports whether any sunEnum is within enums. +func TransportTypeSliceContainsAny(enums []TransportType, sunEnums ...TransportType) bool { + var seenEnums = map[TransportType]struct{}{} + for _, e := range sunEnums { + seenEnums[e] = struct{}{} + } + + for _, v := range enums { + if _, has := seenEnums[v]; has { + return true + } + } + + return false +} diff --git a/pkg/scalers/gcp_pub_sub_scaler.go b/pkg/scalers/gcp_pub_sub_scaler.go index 64a5e558cb0..bb5a14e295a 100644 --- a/pkg/scalers/gcp_pub_sub_scaler.go +++ b/pkg/scalers/gcp_pub_sub_scaler.go @@ -183,7 +183,7 @@ func (s *pubsubScaler) GetMetricSpecForScaling(context.Context) []v2beta2.Metric } // GetMetrics connects to Stack Driver and finds the size of the pub sub subscription -func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { +func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string, _ labels.Selector) ([]external_metrics.ExternalMetricValue, error) { var value int64 var err error @@ -244,7 +244,7 @@ func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (int64 func getSubscriptionData(s *pubsubScaler) (string, string) { var subscriptionID string var projectID string - regexpExpression, _ := regexp.Compile(compositeSubscriptionIDPrefix) + regexpExpression := regexp.MustCompile(compositeSubscriptionIDPrefix) if regexpExpression.MatchString(s.metadata.subscriptionName) { subscriptionID = strings.Split(s.metadata.subscriptionName, "/")[3] projectID = strings.Split(s.metadata.subscriptionName, "/")[1] diff --git a/pkg/scalers/predictkube_scaler.go b/pkg/scalers/predictkube_scaler.go index e890353beff..60befbb2d6d 100644 --- a/pkg/scalers/predictkube_scaler.go +++ b/pkg/scalers/predictkube_scaler.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math" + "net/http" "strconv" "time" @@ -28,11 +29,11 @@ import ( libs "github.com/dysnix/predictkube-libs/external/configs" pc "github.com/dysnix/predictkube-libs/external/grpc/client" - "github.com/dysnix/predictkube-libs/external/http_transport" tc "github.com/dysnix/predictkube-libs/external/types_convertation" "github.com/dysnix/predictkube-proto/external/proto/commonproto" pb "github.com/dysnix/predictkube-proto/external/proto/services" + "github.com/kedacore/keda/v2/pkg/scalers/authentication" kedautil "github.com/kedacore/keda/v2/pkg/util" ) @@ -81,7 +82,6 @@ type PredictKubeScaler struct { grpcClient pb.MlEngineServiceClient healthClient health.HealthClient api v1.API - transport http_transport.FastHttpTransport } type predictKubeMetadata struct { @@ -90,6 +90,7 @@ type predictKubeMetadata struct { stepDuration time.Duration apiKey string prometheusAddress string + prometheusAuth *authentication.AuthMeta metricName string query string threshold int64 @@ -98,7 +99,7 @@ type predictKubeMetadata struct { var predictKubeLog = logf.Log.WithName("predictkube_scaler") -func (pks *PredictKubeScaler) setupClientConn() error { +func (s *PredictKubeScaler) setupClientConn() error { clientOpt, err := pc.SetGrpcClientOptions(grpcConf, &libs.Base{ Monitoring: libs.Monitoring{ @@ -111,7 +112,7 @@ func (pks *PredictKubeScaler) setupClientConn() error { Enabled: false, }, }, - pc.InjectPublicClientMetadataInterceptor(pks.metadata.apiKey), + pc.InjectPublicClientMetadataInterceptor(s.metadata.apiKey), ) if !grpcConf.Conn.Insecure { @@ -126,13 +127,13 @@ func (pks *PredictKubeScaler) setupClientConn() error { return err } - pks.grpcConn, err = grpc.Dial(fmt.Sprintf("%s:%d", mlEngineHost, mlEnginePort), clientOpt...) + s.grpcConn, err = grpc.Dial(fmt.Sprintf("%s:%d", mlEngineHost, mlEnginePort), clientOpt...) if err != nil { return err } - pks.grpcClient = pb.NewMlEngineServiceClient(pks.grpcConn) - pks.healthClient = health.NewHealthClient(pks.grpcConn) + s.grpcClient = pb.NewMlEngineServiceClient(s.grpcConn) + s.healthClient = health.NewHealthClient(s.grpcConn) return err } @@ -165,13 +166,13 @@ func NewPredictKubeScaler(ctx context.Context, config *ScalerConfig) (*PredictKu } // IsActive returns true if we are able to get metrics from PredictKube -func (pks *PredictKubeScaler) IsActive(ctx context.Context) (bool, error) { - results, err := pks.doQuery(ctx) +func (s *PredictKubeScaler) IsActive(ctx context.Context) (bool, error) { + results, err := s.doQuery(ctx) if err != nil { return false, err } - resp, err := pks.healthClient.Check(ctx, &health.HealthCheckRequest{}) + resp, err := s.healthClient.Check(ctx, &health.HealthCheckRequest{}) if resp == nil { return len(results) > 0, fmt.Errorf("can't connect grpc server: empty server response, code: %v", codes.Unknown) @@ -184,17 +185,16 @@ func (pks *PredictKubeScaler) IsActive(ctx context.Context) (bool, error) { return len(results) > 0, nil } -func (pks *PredictKubeScaler) Close(_ context.Context) error { - pks.transport.Close() - return pks.grpcConn.Close() +func (s *PredictKubeScaler) Close(_ context.Context) error { + return s.grpcConn.Close() } -func (pks *PredictKubeScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { - targetMetricValue := resource.NewQuantity(pks.metadata.threshold, resource.DecimalSI) - metricName := kedautil.NormalizeString(fmt.Sprintf("predictkube-%s", pks.metadata.metricName)) +func (s *PredictKubeScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(s.metadata.threshold, resource.DecimalSI) + metricName := kedautil.NormalizeString(fmt.Sprintf("predictkube-%s", s.metadata.metricName)) externalMetric := &v2beta2.ExternalMetricSource{ Metric: v2beta2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(pks.metadata.scalerIndex, metricName), + Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, metricName), }, Target: v2beta2.MetricTarget{ Type: v2beta2.AverageValueMetricType, @@ -208,8 +208,8 @@ func (pks *PredictKubeScaler) GetMetricSpecForScaling(context.Context) []v2beta2 return []v2beta2.MetricSpec{metricSpec} } -func (pks *PredictKubeScaler) GetMetrics(ctx context.Context, metricName string, _ labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - value, err := pks.doPredictRequest(ctx) +func (s *PredictKubeScaler) GetMetrics(ctx context.Context, metricName string, _ labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + value, err := s.doPredictRequest(ctx) if err != nil { predictKubeLog.Error(err, "error executing query to predict controller service") return []external_metrics.ExternalMetricValue{}, err @@ -234,14 +234,14 @@ func (pks *PredictKubeScaler) GetMetrics(ctx context.Context, metricName string, return append([]external_metrics.ExternalMetricValue{}, metric), nil } -func (pks *PredictKubeScaler) doPredictRequest(ctx context.Context) (int64, error) { - results, err := pks.doQuery(ctx) +func (s *PredictKubeScaler) doPredictRequest(ctx context.Context) (int64, error) { + results, err := s.doQuery(ctx) if err != nil { return 0, err } - resp, err := pks.grpcClient.GetPredictMetric(ctx, &pb.ReqGetPredictMetric{ - ForecastHorizon: uint64(math.Round(float64(pks.metadata.predictHorizon / pks.metadata.stepDuration))), + resp, err := s.grpcClient.GetPredictMetric(ctx, &pb.ReqGetPredictMetric{ + ForecastHorizon: uint64(math.Round(float64(s.metadata.predictHorizon / s.metadata.stepDuration))), Observations: results, }) @@ -264,20 +264,20 @@ func (pks *PredictKubeScaler) doPredictRequest(ctx context.Context) (int64, erro }(x, y), nil } -func (pks *PredictKubeScaler) doQuery(ctx context.Context) ([]*commonproto.Item, error) { +func (s *PredictKubeScaler) doQuery(ctx context.Context) ([]*commonproto.Item, error) { currentTime := time.Now().UTC() - if pks.metadata.stepDuration == 0 { - pks.metadata.stepDuration = defaultStep + if s.metadata.stepDuration == 0 { + s.metadata.stepDuration = defaultStep } r := v1.Range{ - Start: currentTime.Add(-pks.metadata.historyTimeWindow), + Start: currentTime.Add(-s.metadata.historyTimeWindow), End: currentTime, - Step: pks.metadata.stepDuration, + Step: s.metadata.stepDuration, } - val, warns, err := pks.api.QueryRange(ctx, pks.metadata.query, r) + val, warns, err := s.api.QueryRange(ctx, s.metadata.query, r) if len(warns) > 0 { predictKubeLog.V(1).Info("warnings", warns) @@ -287,11 +287,11 @@ func (pks *PredictKubeScaler) doQuery(ctx context.Context) ([]*commonproto.Item, return nil, err } - return pks.parsePrometheusResult(val) + return s.parsePrometheusResult(val) } -func (pks *PredictKubeScaler) parsePrometheusResult(result model.Value) (out []*commonproto.Item, err error) { - metricName := GenerateMetricNameWithIndex(pks.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("predictkube-%s", pks.metadata.metricName))) +func (s *PredictKubeScaler) parsePrometheusResult(result model.Value) (out []*commonproto.Item, err error) { + metricName := GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("predictkube-%s", s.metadata.metricName))) switch result.Type() { case model.ValVector: if res, ok := result.(model.Vector); ok { @@ -452,33 +452,39 @@ func parsePredictKubeMetadata(config *ScalerConfig) (result *predictKubeMetadata return nil, fmt.Errorf("no api key given") } + meta.prometheusAuth, err = authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams) + if err != nil { + return nil, err + } + return &meta, nil } -func (pks *PredictKubeScaler) ping(ctx context.Context) (err error) { - _, err = pks.api.Runtimeinfo(ctx) +func (s *PredictKubeScaler) ping(ctx context.Context) (err error) { + _, err = s.api.Runtimeinfo(ctx) return err } // initPredictKubePrometheusConn init prometheus client and setup connection to API -func (pks *PredictKubeScaler) initPredictKubePrometheusConn(ctx context.Context) (err error) { - pks.transport = http_transport.NewTransport(&libs.HTTPTransport{ - MaxIdleConnDuration: 10, - ReadTimeout: time.Second * 15, - WriteTimeout: time.Second * 15, - }) - - pks.prometheusClient, err = api.NewClient(api.Config{ - Address: pks.metadata.prometheusAddress, - RoundTripper: pks.transport, - }) +func (s *PredictKubeScaler) initPredictKubePrometheusConn(ctx context.Context) (err error) { + var roundTripper http.RoundTripper + if roundTripper, err = authentication.CreateHTTPRoundTripper( + authentication.FastHTTP, + s.metadata.prometheusAuth, + ); err != nil { + predictKubeLog.V(1).Error(err, "init Prometheus client http transport") + return err + } - if err != nil { + if s.prometheusClient, err = api.NewClient(api.Config{ + Address: s.metadata.prometheusAddress, + RoundTripper: roundTripper, + }); err != nil { predictKubeLog.V(1).Error(err, "init Prometheus client") return err } - pks.api = v1.NewAPI(pks.prometheusClient) + s.api = v1.NewAPI(s.prometheusClient) - return pks.ping(ctx) + return s.ping(ctx) } diff --git a/pkg/scalers/prometheus_scaler.go b/pkg/scalers/prometheus_scaler.go index fadad7baa8a..8b170ec50be 100644 --- a/pkg/scalers/prometheus_scaler.go +++ b/pkg/scalers/prometheus_scaler.go @@ -3,13 +3,11 @@ package scalers import ( "context" "encoding/json" - "errors" "fmt" "io/ioutil" "net/http" url_pkg "net/url" "strconv" - "strings" "time" v2beta2 "k8s.io/api/autoscaling/v2beta2" @@ -36,27 +34,12 @@ type prometheusScaler struct { } type prometheusMetadata struct { - serverAddress string - metricName string - query string - threshold int - - // bearer auth - enableBearerAuth bool - bearerToken string - - // basic auth - enableBasicAuth bool - username string - password string // +optional - - // client certification - enableTLS bool - cert string - key string - ca string - - scalerIndex int + serverAddress string + metricName string + query string + threshold int + prometheusAuth *authentication.AuthMeta + scalerIndex int } type promQueryResult struct { @@ -82,13 +65,14 @@ func NewPrometheusScaler(config *ScalerConfig) (Scaler, error) { httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) - if meta.ca != "" || meta.enableTLS { - config, err := kedautil.NewTLSConfig(meta.cert, meta.key, meta.ca) - if err != nil || config == nil { - return nil, fmt.Errorf("error creating the TLS config: %s", err) + if meta.prometheusAuth.CA != "" || meta.prometheusAuth.EnableTLS { + if httpClient.Transport, err = authentication.CreateHTTPRoundTripper( + authentication.NetHTTP, + meta.prometheusAuth, + ); err != nil { + predictKubeLog.V(1).Error(err, "init Prometheus client http transport") + return nil, err } - - httpClient.Transport = &http.Transport{TLSClientConfig: config} } return &prometheusScaler{ @@ -97,8 +81,8 @@ func NewPrometheusScaler(config *ScalerConfig) (Scaler, error) { }, nil } -func parsePrometheusMetadata(config *ScalerConfig) (*prometheusMetadata, error) { - meta := prometheusMetadata{} +func parsePrometheusMetadata(config *ScalerConfig) (meta *prometheusMetadata, err error) { + meta = &prometheusMetadata{} if val, ok := config.TriggerMetadata[promServerAddress]; ok && val != "" { meta.serverAddress = val @@ -129,61 +113,12 @@ func parsePrometheusMetadata(config *ScalerConfig) (*prometheusMetadata, error) meta.scalerIndex = config.ScalerIndex - authModes, ok := config.TriggerMetadata["authModes"] - // no authMode specified - if !ok { - return &meta, nil - } - - authTypes := strings.Split(authModes, ",") - for _, t := range authTypes { - authType := authentication.Type(strings.TrimSpace(t)) - switch authType { - case authentication.BearerAuthType: - if len(config.AuthParams["bearerToken"]) == 0 { - return nil, errors.New("no bearer token provided") - } - if meta.enableBasicAuth { - return nil, errors.New("beare and basic authentication can not be set both") - } - - meta.bearerToken = config.AuthParams["bearerToken"] - meta.enableBearerAuth = true - case authentication.BasicAuthType: - if len(config.AuthParams["username"]) == 0 { - return nil, errors.New("no username given") - } - if meta.enableBearerAuth { - return nil, errors.New("beare and basic authentication can not be set both") - } - - meta.username = config.AuthParams["username"] - // password is optional. For convenience, many application implement basic auth with - // username as apikey and password as empty - meta.password = config.AuthParams["password"] - meta.enableBasicAuth = true - case authentication.TLSAuthType: - if len(config.AuthParams["cert"]) == 0 { - return nil, errors.New("no cert given") - } - meta.cert = config.AuthParams["cert"] - - if len(config.AuthParams["key"]) == 0 { - return nil, errors.New("no key given") - } - - meta.key = config.AuthParams["key"] - meta.enableTLS = true - default: - return nil, fmt.Errorf("err incorrect value for authMode is given: %s", t) - } - } - - if len(config.AuthParams["ca"]) > 0 { - meta.ca = config.AuthParams["ca"] + meta.prometheusAuth, err = authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams) + if err != nil { + return nil, err } - return &meta, nil + return meta, nil } func (s *prometheusScaler) IsActive(ctx context.Context) (bool, error) { @@ -227,10 +162,10 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error return -1, err } - if s.metadata.enableBearerAuth { - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.metadata.bearerToken)) - } else if s.metadata.enableBasicAuth { - req.SetBasicAuth(s.metadata.username, s.metadata.password) + if s.metadata.prometheusAuth.EnableBearerAuth { + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.metadata.prometheusAuth.BearerToken)) + } else if s.metadata.prometheusAuth.EnableBasicAuth { + req.SetBasicAuth(s.metadata.prometheusAuth.Username, s.metadata.prometheusAuth.Password) } r, err := s.httpClient.Do(req) @@ -242,7 +177,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error if err != nil { return -1, err } - r.Body.Close() + _ = r.Body.Close() if !(r.StatusCode >= 200 && r.StatusCode <= 299) { return -1, fmt.Errorf("prometheus query api returned error. status: %d response: %s", r.StatusCode, string(b)) @@ -283,7 +218,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error return v, nil } -func (s *prometheusScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { +func (s *prometheusScaler) GetMetrics(ctx context.Context, metricName string, _ labels.Selector) ([]external_metrics.ExternalMetricValue, error) { val, err := s.ExecutePromQuery(ctx) if err != nil { prometheusLog.Error(err, "error executing prometheus query") diff --git a/pkg/scalers/prometheus_scaler_test.go b/pkg/scalers/prometheus_scaler_test.go index 3f4169502c0..3a73a7d10d4 100644 --- a/pkg/scalers/prometheus_scaler_test.go +++ b/pkg/scalers/prometheus_scaler_test.go @@ -114,9 +114,9 @@ func TestPrometheusScalerAuthParams(t *testing.T) { } if err == nil { - if (meta.enableBearerAuth && !strings.Contains(testData.metadata["authModes"], "bearer")) || - (meta.enableBasicAuth && !strings.Contains(testData.metadata["authModes"], "basic")) || - (meta.enableTLS && !strings.Contains(testData.metadata["authModes"], "tls")) { + if (meta.prometheusAuth.EnableBearerAuth && !strings.Contains(testData.metadata["authModes"], "bearer")) || + (meta.prometheusAuth.EnableBasicAuth && !strings.Contains(testData.metadata["authModes"], "basic")) || + (meta.prometheusAuth.EnableTLS && !strings.Contains(testData.metadata["authModes"], "tls")) { t.Error("wrong auth mode detected") } } From db228cd469acda85054cc88ace495dac6be2f307 Mon Sep 17 00:00:00 2001 From: alex60217101990 Date: Wed, 12 Jan 2022 09:01:41 +0200 Subject: [PATCH 2/7] add comments for auth logic Signed-off-by: alex60217101990 --- .github/workflows/main-build.yml | 1 + .github/workflows/nightly-e2e.yml | 1 + .github/workflows/pr-e2e.yml | 1 + .pre-commit-config.yaml | 2 +- pkg/scalers/predictkube_scaler.go | 2 ++ pkg/scalers/prometheus_scaler.go | 2 ++ 6 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/main-build.yml b/.github/workflows/main-build.yml index 901be142fd2..23bc81edefa 100644 --- a/.github/workflows/main-build.yml +++ b/.github/workflows/main-build.yml @@ -83,6 +83,7 @@ jobs: OPENSTACK_PASSWORD: ${{ secrets.OPENSTACK_PASSWORD }} OPENSTACK_PROJECT_ID: ${{ secrets.OPENSTACK_PROJECT_ID }} OPENSTACK_USER_ID: ${{ secrets.OPENSTACK_USER_ID }} + PREDICTKUBE_API_KEY: ${{ secrets.PREDICTKUBE_API_KEY }} TEST_LOG_ANALYTICS_WORKSPACE_ID: ${{ secrets.TEST_LOG_ANALYTICS_WORKSPACE_ID }} TEST_STORAGE_CONNECTION_STRING: ${{ secrets.TEST_STORAGE_CONNECTION_STRING }} run: make e2e-test diff --git a/.github/workflows/nightly-e2e.yml b/.github/workflows/nightly-e2e.yml index eaf6a7f3442..c3547c842f3 100644 --- a/.github/workflows/nightly-e2e.yml +++ b/.github/workflows/nightly-e2e.yml @@ -37,6 +37,7 @@ jobs: OPENSTACK_PASSWORD: ${{ secrets.OPENSTACK_PASSWORD }} OPENSTACK_PROJECT_ID: ${{ secrets.OPENSTACK_PROJECT_ID }} OPENSTACK_USER_ID: ${{ secrets.OPENSTACK_USER_ID }} + PREDICTKUBE_API_KEY: ${{ secrets.PREDICTKUBE_API_KEY }} TEST_LOG_ANALYTICS_WORKSPACE_ID: ${{ secrets.TEST_LOG_ANALYTICS_WORKSPACE_ID }} TEST_STORAGE_CONNECTION_STRING: ${{ secrets.TEST_STORAGE_CONNECTION_STRING }} run: make e2e-test diff --git a/.github/workflows/pr-e2e.yml b/.github/workflows/pr-e2e.yml index ebe3d06e29d..6f5b10e78b1 100644 --- a/.github/workflows/pr-e2e.yml +++ b/.github/workflows/pr-e2e.yml @@ -84,6 +84,7 @@ jobs: OPENSTACK_PASSWORD: ${{ secrets.OPENSTACK_PASSWORD }} OPENSTACK_PROJECT_ID: ${{ secrets.OPENSTACK_PROJECT_ID }} OPENSTACK_USER_ID: ${{ secrets.OPENSTACK_USER_ID }} + PREDICTKUBE_API_KEY: ${{ secrets.PREDICTKUBE_API_KEY }} TEST_CLUSTER_NAME: keda-pr-run TEST_LOG_ANALYTICS_WORKSPACE_ID: ${{ secrets.TEST_LOG_ANALYTICS_WORKSPACE_ID }} TEST_STORAGE_CONNECTION_STRING: ${{ secrets.TEST_STORAGE_CONNECTION_STRING }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1430e5c745f..f1df262a962 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ default_stages: [commit, push] minimum_pre_commit_version: "1.20.0" repos: - - repo: git://github.com/dnephin/pre-commit-golang + - repo: https://github.com/dnephin/pre-commit-golang rev: v0.3.5 hooks: - id: go-fmt diff --git a/pkg/scalers/predictkube_scaler.go b/pkg/scalers/predictkube_scaler.go index 60befbb2d6d..b5556a33db2 100644 --- a/pkg/scalers/predictkube_scaler.go +++ b/pkg/scalers/predictkube_scaler.go @@ -452,6 +452,7 @@ func parsePredictKubeMetadata(config *ScalerConfig) (result *predictKubeMetadata return nil, fmt.Errorf("no api key given") } + // parse auth configs from ScalerConfig meta.prometheusAuth, err = authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams) if err != nil { return nil, err @@ -468,6 +469,7 @@ func (s *PredictKubeScaler) ping(ctx context.Context) (err error) { // initPredictKubePrometheusConn init prometheus client and setup connection to API func (s *PredictKubeScaler) initPredictKubePrometheusConn(ctx context.Context) (err error) { var roundTripper http.RoundTripper + // create http.RoundTripper with auth settings from ScalerConfig if roundTripper, err = authentication.CreateHTTPRoundTripper( authentication.FastHTTP, s.metadata.prometheusAuth, diff --git a/pkg/scalers/prometheus_scaler.go b/pkg/scalers/prometheus_scaler.go index 8b170ec50be..a7e2dd6bcec 100644 --- a/pkg/scalers/prometheus_scaler.go +++ b/pkg/scalers/prometheus_scaler.go @@ -66,6 +66,7 @@ func NewPrometheusScaler(config *ScalerConfig) (Scaler, error) { httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) if meta.prometheusAuth.CA != "" || meta.prometheusAuth.EnableTLS { + // create http.RoundTripper with auth settings from ScalerConfig if httpClient.Transport, err = authentication.CreateHTTPRoundTripper( authentication.NetHTTP, meta.prometheusAuth, @@ -113,6 +114,7 @@ func parsePrometheusMetadata(config *ScalerConfig) (meta *prometheusMetadata, er meta.scalerIndex = config.ScalerIndex + // parse auth configs from ScalerConfig meta.prometheusAuth, err = authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams) if err != nil { return nil, err From a0f80d72ba9478228e0293e1e15f7033964c41c5 Mon Sep 17 00:00:00 2001 From: alex60217101990 Date: Wed, 12 Jan 2022 10:14:22 +0200 Subject: [PATCH 3/7] return pre-commit-config Signed-off-by: alex60217101990 --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f1df262a962..1430e5c745f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ default_stages: [commit, push] minimum_pre_commit_version: "1.20.0" repos: - - repo: https://github.com/dnephin/pre-commit-golang + - repo: git://github.com/dnephin/pre-commit-golang rev: v0.3.5 hooks: - id: go-fmt From 4b1334bff2785ea6f5d809a08ce29275397c0944 Mon Sep 17 00:00:00 2001 From: alex60217101990 Date: Wed, 12 Jan 2022 11:59:26 +0200 Subject: [PATCH 4/7] fix prometheus_scaler.go Signed-off-by: alex60217101990 --- pkg/scalers/gcp_pub_sub_scaler.go | 4 ++-- pkg/scalers/prometheus_scaler.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/scalers/gcp_pub_sub_scaler.go b/pkg/scalers/gcp_pub_sub_scaler.go index bb5a14e295a..64a5e558cb0 100644 --- a/pkg/scalers/gcp_pub_sub_scaler.go +++ b/pkg/scalers/gcp_pub_sub_scaler.go @@ -183,7 +183,7 @@ func (s *pubsubScaler) GetMetricSpecForScaling(context.Context) []v2beta2.Metric } // GetMetrics connects to Stack Driver and finds the size of the pub sub subscription -func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string, _ labels.Selector) ([]external_metrics.ExternalMetricValue, error) { +func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { var value int64 var err error @@ -244,7 +244,7 @@ func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (int64 func getSubscriptionData(s *pubsubScaler) (string, string) { var subscriptionID string var projectID string - regexpExpression := regexp.MustCompile(compositeSubscriptionIDPrefix) + regexpExpression, _ := regexp.Compile(compositeSubscriptionIDPrefix) if regexpExpression.MatchString(s.metadata.subscriptionName) { subscriptionID = strings.Split(s.metadata.subscriptionName, "/")[3] projectID = strings.Split(s.metadata.subscriptionName, "/")[1] diff --git a/pkg/scalers/prometheus_scaler.go b/pkg/scalers/prometheus_scaler.go index a7e2dd6bcec..438be2ed73b 100644 --- a/pkg/scalers/prometheus_scaler.go +++ b/pkg/scalers/prometheus_scaler.go @@ -65,7 +65,7 @@ func NewPrometheusScaler(config *ScalerConfig) (Scaler, error) { httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) - if meta.prometheusAuth.CA != "" || meta.prometheusAuth.EnableTLS { + if meta.prometheusAuth != nil && (meta.prometheusAuth.CA != "" || meta.prometheusAuth.EnableTLS) { // create http.RoundTripper with auth settings from ScalerConfig if httpClient.Transport, err = authentication.CreateHTTPRoundTripper( authentication.NetHTTP, From 360fb6e5012b96ad9795d124a665bdd4bdf0f2b2 Mon Sep 17 00:00:00 2001 From: alex60217101990 Date: Wed, 12 Jan 2022 13:14:26 +0200 Subject: [PATCH 5/7] fix prometheus_scaler.go Signed-off-by: alex60217101990 --- .../authentication/authentication_helpers.go | 4 +- pkg/scalers/prometheus_scaler.go | 2 +- tests/scalers/predictkube.test.ts | 222 ++++++++++++++++++ 3 files changed, 225 insertions(+), 3 deletions(-) create mode 100644 tests/scalers/predictkube.test.ts diff --git a/pkg/scalers/authentication/authentication_helpers.go b/pkg/scalers/authentication/authentication_helpers.go index bb5b9af2988..fd8b260172f 100644 --- a/pkg/scalers/authentication/authentication_helpers.go +++ b/pkg/scalers/authentication/authentication_helpers.go @@ -22,6 +22,8 @@ const ( ) func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMeta, err error) { + out = &AuthMeta{} + authModes, ok := triggerMetadata[authModesKey] // no authMode specified if !ok { @@ -32,8 +34,6 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet for _, t := range authTypes { authType := Type(strings.TrimSpace(t)) - out = &AuthMeta{} - switch authType { case BearerAuthType: if len(authParams["bearerToken"]) == 0 { diff --git a/pkg/scalers/prometheus_scaler.go b/pkg/scalers/prometheus_scaler.go index 438be2ed73b..353a4308de1 100644 --- a/pkg/scalers/prometheus_scaler.go +++ b/pkg/scalers/prometheus_scaler.go @@ -164,7 +164,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error return -1, err } - if s.metadata.prometheusAuth.EnableBearerAuth { + if s.metadata.prometheusAuth != nil && s.metadata.prometheusAuth.EnableBearerAuth { req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.metadata.prometheusAuth.BearerToken)) } else if s.metadata.prometheusAuth.EnableBasicAuth { req.SetBasicAuth(s.metadata.prometheusAuth.Username, s.metadata.prometheusAuth.Password) diff --git a/tests/scalers/predictkube.test.ts b/tests/scalers/predictkube.test.ts new file mode 100644 index 00000000000..f6302c51c51 --- /dev/null +++ b/tests/scalers/predictkube.test.ts @@ -0,0 +1,222 @@ +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' +import {waitForRollout} from "./helpers"; + +const testNamespace = 'predictkube-test' +const prometheusNamespace = 'monitoring' +const prometheusDeploymentFile = 'scalers/prometheus-deployment.yaml' + +test.before(t => { + // install prometheus + sh.exec(`kubectl create namespace ${prometheusNamespace}`) + t.is(0, sh.exec(`kubectl apply --namespace ${prometheusNamespace} -f ${prometheusDeploymentFile}`).code, 'creating a Prometheus deployment should work.') + // wait for prometheus to load + t.is(0, waitForRollout('deployment', "prometheus-server", prometheusNamespace)) + + sh.config.silent = true + // create deployments - there are two deployments - both using the same image but one deployment + // is directly tied to the KEDA HPA while the other is isolated that can be used for metrics + // even when the KEDA deployment is at zero - the service points to both deployments + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, deployYaml.replace('{{PROMETHEUS_NAMESPACE}}', prometheusNamespace)) + sh.exec(`kubectl create namespace ${testNamespace}`) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment should work.' + ) + for (let i = 0; i < 10; i++) { + const readyReplicaCount = sh.exec(`kubectl get deployment.apps/test-app --namespace ${testNamespace} -o jsonpath="{.status.readyReplicas}`).stdout + if (readyReplicaCount != '1') { + sh.exec('sleep 2s') + } + } +}) + +test.serial('Deployment should have 0 replicas on start', t => { + const replicaCount = sh.exec( + `kubectl get deployment.apps/keda-test-app --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + +test.serial(`Deployment should scale to 5 (the max) with HTTP Requests exceeding in the rate then back to 0`, t => { + // generate a large number of HTTP requests (using Apache Bench) that will take some time + // so prometheus has some time to scrape it + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, generateRequestsYaml.replace('{{NAMESPACE}}', testNamespace)) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating job should work.' + ) + + t.is( + '1', + sh.exec( + `kubectl get deployment.apps/test-app --namespace ${testNamespace} -o jsonpath="{.status.readyReplicas}"` + ).stdout, + 'There should be 1 replica for the test-app deployment' + ) + + // keda based deployment should start scaling up with http requests issued + let replicaCount = '0' + for (let i = 0; i < 60 && replicaCount !== '5'; i++) { + t.log(`Waited ${5 * i} seconds for prometheus-based deployments to scale up`) + const jobLogs = sh.exec(`kubectl logs -l job-name=generate-requests -n ${testNamespace}`).stdout + t.log(`Logs from the generate requests: ${jobLogs}`) + + replicaCount = sh.exec( + `kubectl get deployment.apps/keda-test-app --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + if (replicaCount !== '5') { + sh.exec('sleep 5s') + } + } + + t.is('5', replicaCount, 'Replica count should be maxed at 5') + + for (let i = 0; i < 50 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment.apps/keda-test-app --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + if (replicaCount !== '0') { + sh.exec('sleep 5s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 after 3 minutes') +}) + +test.after.always.cb('clean up prometheus deployment', t => { + const resources = [ + 'scaledobject.keda.sh/predictkube-scaledobject', + 'deployment.apps/test-app', + 'deployment.apps/keda-test-app', + 'service/test-app', + 'job/generate-requests', + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} --namespace ${testNamespace}`) + } + sh.exec(`kubectl delete namespace ${testNamespace}`) + + // uninstall prometheus + sh.exec(`kubectl delete --namespace ${prometheusNamespace} -f ${prometheusDeploymentFile}`) + sh.exec(`kubectl delete namespace ${prometheusNamespace}`) + + t.end() +}) + +const deployYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: test-app + name: test-app +spec: + replicas: 1 + selector: + matchLabels: + app: test-app + template: + metadata: + labels: + app: test-app + type: keda-testing + spec: + containers: + - name: prom-test-app + image: tbickford/simple-web-app-prometheus:a13ade9 + imagePullPolicy: IfNotPresent +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: keda-test-app + name: keda-test-app +spec: + replicas: 0 + selector: + matchLabels: + app: keda-test-app + template: + metadata: + labels: + app: keda-test-app + type: keda-testing + spec: + containers: + - name: prom-test-app + image: tbickford/simple-web-app-prometheus:a13ade9 + imagePullPolicy: IfNotPresent +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: test-app + annotations: + prometheus.io/scrape: "true" + name: test-app +spec: + ports: + - name: http + port: 80 + protocol: TCP + targetPort: 8080 + selector: + type: keda-testing +--- +apiVersion: v1 +kind: Secret +metadata: + name: predictkube-secrets +type: Opaque +data: + apiKey: MTIzNDU2Nwo= +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: predictkube-scaledobject +spec: + scaleTargetRef: + name: keda-test-app + minReplicaCount: 0 + maxReplicaCount: 5 + pollingInterval: 5 + cooldownPeriod: 10 + triggers: + - type: predictkube + metadata: + predictHorizon: "2h" + historyTimeWindow: "7d" + prometheusAddress: http://prometheus-server.{{PROMETHEUS_NAMESPACE}}.svc + metricName: http_requests_total + threshold: '100' + query: sum(rate(http_requests_total{app="test-app"}[2m])) + queryStep: "2m" + authenticationRef: + name: predictkube-trigger + ` + +const generateRequestsYaml = `apiVersion: batch/v1 +kind: Job +metadata: + name: generate-requests +spec: + template: + spec: + containers: + - image: jordi/ab + name: test + command: ["/bin/sh"] + args: ["-c", "for i in $(seq 1 60);do echo $i;ab -c 5 -n 1000 -v 2 http://test-app.{{NAMESPACE}}.svc/;sleep 1;done"] + restartPolicy: Never + activeDeadlineSeconds: 120 + backoffLimit: 2` From f86d99de8286a276e7edf340ba0a42915c01173b Mon Sep 17 00:00:00 2001 From: alex60217101990 Date: Wed, 12 Jan 2022 16:00:52 +0200 Subject: [PATCH 6/7] fix prometheus_scaler.go Signed-off-by: alex60217101990 --- pkg/scalers/prometheus_scaler.go | 2 +- tests/scalers/predictkube.test.ts | 23 ++++++++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/pkg/scalers/prometheus_scaler.go b/pkg/scalers/prometheus_scaler.go index 353a4308de1..9a311dc038e 100644 --- a/pkg/scalers/prometheus_scaler.go +++ b/pkg/scalers/prometheus_scaler.go @@ -166,7 +166,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error if s.metadata.prometheusAuth != nil && s.metadata.prometheusAuth.EnableBearerAuth { req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.metadata.prometheusAuth.BearerToken)) - } else if s.metadata.prometheusAuth.EnableBasicAuth { + } else if s.metadata.prometheusAuth != nil && s.metadata.prometheusAuth.EnableBasicAuth { req.SetBasicAuth(s.metadata.prometheusAuth.Username, s.metadata.prometheusAuth.Password) } diff --git a/tests/scalers/predictkube.test.ts b/tests/scalers/predictkube.test.ts index f6302c51c51..784ace129df 100644 --- a/tests/scalers/predictkube.test.ts +++ b/tests/scalers/predictkube.test.ts @@ -4,6 +4,7 @@ import * as tmp from 'tmp' import test from 'ava' import {waitForRollout} from "./helpers"; +const predictkubeApiKey = process.env['PREDICTKUBE_API_KEY'] const testNamespace = 'predictkube-test' const prometheusNamespace = 'monitoring' const prometheusDeploymentFile = 'scalers/prometheus-deployment.yaml' @@ -20,7 +21,10 @@ test.before(t => { // is directly tied to the KEDA HPA while the other is isolated that can be used for metrics // even when the KEDA deployment is at zero - the service points to both deployments const tmpFile = tmp.fileSync() - fs.writeFileSync(tmpFile.name, deployYaml.replace('{{PROMETHEUS_NAMESPACE}}', prometheusNamespace)) + fs.writeFileSync(tmpFile.name, deployYaml + .replace('{{PREDICTKUBE_API_KEY}}', Buffer.from(predictkubeApiKey).toString('base64')) + .replace('{{PROMETHEUS_NAMESPACE}}', prometheusNamespace) + ) sh.exec(`kubectl create namespace ${testNamespace}`) t.is( 0, @@ -172,13 +176,23 @@ spec: selector: type: keda-testing --- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: predictkube-trigger +spec: + secretTargetRef: + - parameter: apiKey + name: predictkube-secret + key: apiKey +--- apiVersion: v1 kind: Secret metadata: - name: predictkube-secrets + name: predictkube-secret type: Opaque data: - apiKey: MTIzNDU2Nwo= + apiKey: {{PREDICTKUBE_API_KEY}} --- apiVersion: keda.sh/v1alpha1 kind: ScaledObject @@ -202,8 +216,7 @@ spec: query: sum(rate(http_requests_total{app="test-app"}[2m])) queryStep: "2m" authenticationRef: - name: predictkube-trigger - ` + name: predictkube-trigger` const generateRequestsYaml = `apiVersion: batch/v1 kind: Job From eae6c6f6a2d35b8f75c9a4811c9a749ef9d60815 Mon Sep 17 00:00:00 2001 From: alex60217101990 Date: Thu, 13 Jan 2022 11:58:53 +0200 Subject: [PATCH 7/7] add working e2e test for predictkube scaler Signed-off-by: alex60217101990 --- pkg/scalers/predictkube_scaler.go | 7 ++++++- tests/scalers/predictkube.test.ts | 7 ++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/scalers/predictkube_scaler.go b/pkg/scalers/predictkube_scaler.go index b5556a33db2..99341a92f15 100644 --- a/pkg/scalers/predictkube_scaler.go +++ b/pkg/scalers/predictkube_scaler.go @@ -182,7 +182,12 @@ func (s *PredictKubeScaler) IsActive(ctx context.Context) (bool, error) { return len(results) > 0, fmt.Errorf("can't connect grpc server: %v, code: %v", err, status.Code(err)) } - return len(results) > 0, nil + var y int64 + if len(results) > 0 { + y = int64(results[len(results)-1].Value) + } + + return y > 0, nil } func (s *PredictKubeScaler) Close(_ context.Context) error { diff --git a/tests/scalers/predictkube.test.ts b/tests/scalers/predictkube.test.ts index 784ace129df..131e420cd85 100644 --- a/tests/scalers/predictkube.test.ts +++ b/tests/scalers/predictkube.test.ts @@ -68,7 +68,7 @@ test.serial(`Deployment should scale to 5 (the max) with HTTP Requests exceeding // keda based deployment should start scaling up with http requests issued let replicaCount = '0' for (let i = 0; i < 60 && replicaCount !== '5'; i++) { - t.log(`Waited ${5 * i} seconds for prometheus-based deployments to scale up`) + t.log(`Waited ${5 * i} seconds for predictkube-based deployments to scale up`) const jobLogs = sh.exec(`kubectl logs -l job-name=generate-requests -n ${testNamespace}`).stdout t.log(`Logs from the generate requests: ${jobLogs}`) @@ -82,7 +82,8 @@ test.serial(`Deployment should scale to 5 (the max) with HTTP Requests exceeding t.is('5', replicaCount, 'Replica count should be maxed at 5') - for (let i = 0; i < 50 && replicaCount !== '0'; i++) { + for (let i = 0; i < 60 && replicaCount !== '0'; i++) { + t.log(`Waited ${5 * i} seconds for predictkube-based deployments to scale down`) replicaCount = sh.exec( `kubectl get deployment.apps/keda-test-app --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` ).stdout @@ -94,7 +95,7 @@ test.serial(`Deployment should scale to 5 (the max) with HTTP Requests exceeding t.is('0', replicaCount, 'Replica count should be 0 after 3 minutes') }) -test.after.always.cb('clean up prometheus deployment', t => { +test.after.always.cb('clean up predictkube deployment', t => { const resources = [ 'scaledobject.keda.sh/predictkube-scaledobject', 'deployment.apps/test-app',