diff --git a/.golangci.yml b/.golangci.yml index 6bd89d53783..48fb6a144b6 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -55,7 +55,7 @@ issues: linters: - gomnd - dupl - - path: scale_handler.go + - path: scalers_builder.go linters: - gocyclo - path: scale_scaledobjects.go diff --git a/CREATE-NEW-SCALER.md b/CREATE-NEW-SCALER.md index 1ec9ee828f6..8bf1a316d65 100644 --- a/CREATE-NEW-SCALER.md +++ b/CREATE-NEW-SCALER.md @@ -8,7 +8,7 @@ In order to develop a scaler, a developer should do the following: 2. Create the new scaler struct under the `pkg/scalers` folder. 3. Implement the methods defined in the [scaler interface](#scaler-interface) section. 4. Create a constructor according to [this](#constructor). -5. Change the `buildScaler` function in `pkg/scaling/scale_handler.go` by adding another switch case that matches your scaler. Scalers in the switch are ordered alphabetically, please follow the same pattern. +5. Change the `buildScaler` function in `pkg/scaling/scalers_builder.go` by adding another switch case that matches your scaler. Scalers in the switch are ordered alphabetically, please follow the same pattern. 6. Run `make build` from the root of KEDA and your scaler is ready. If you want to deploy locally diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go index d06f9bc2be8..de14500b9cf 100644 --- a/pkg/scaling/cache/scalers_cache.go +++ b/pkg/scaling/cache/scalers_cache.go @@ -48,6 +48,7 @@ type ScalerBuilder struct { Factory func() (scalers.Scaler, *scalers.ScalerConfig, error) } +// GetScalers returns array of scalers and scaler config stored in the cache func (c *ScalersCache) GetScalers() ([]scalers.Scaler, []scalers.ScalerConfig) { scalersList := make([]scalers.Scaler, 0, len(c.Scalers)) configsList := make([]scalers.ScalerConfig, 0, len(c.Scalers)) @@ -59,6 +60,7 @@ func (c *ScalersCache) GetScalers() ([]scalers.Scaler, []scalers.ScalerConfig) { return scalersList, configsList } +// GetPushScaler returns array of push scalers stored in the cache func (c *ScalersCache) GetPushScalers() []scalers.PushScaler { var result []scalers.PushScaler for _, s := range c.Scalers { @@ -69,6 +71,27 @@ func (c *ScalersCache) GetPushScalers() []scalers.PushScaler { return result } +// Close closes all scalers in the cache +func (c *ScalersCache) Close(ctx context.Context) { + scalers := c.Scalers + c.Scalers = nil + for _, s := range scalers { + err := s.Scaler.Close(ctx) + if err != nil { + log.Error(err, "error closing scaler", "scaler", s) + } + } +} + +// GetMetricSpecForScaling returns metrics specs for all scalers in the cache +func (c *ScalersCache) GetMetricSpecForScaling(ctx context.Context) []v2.MetricSpec { + var spec []v2.MetricSpec + for _, s := range c.Scalers { + spec = append(spec, s.Scaler.GetMetricSpecForScaling(ctx)...) + } + return spec +} + // GetMetricSpecForScalingForScaler returns metrics spec for a scaler identified by the metric name func (c *ScalersCache) GetMetricSpecForScalingForScaler(ctx context.Context, index int) ([]v2.MetricSpec, error) { var err error @@ -118,6 +141,8 @@ func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index return metric, activity, time.Since(startTime).Milliseconds(), err } +// TODO needs refactor - move ScaledJob related methods to scale_handler, the similar way ScaledObject methods are +// refactor logic func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { var queueLength float64 var maxValue float64 @@ -202,32 +227,12 @@ func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scale return ns, nil } -func (c *ScalersCache) GetMetricSpecForScaling(ctx context.Context) []v2.MetricSpec { - var spec []v2.MetricSpec - for _, s := range c.Scalers { - spec = append(spec, s.Scaler.GetMetricSpecForScaling(ctx)...) - } - return spec -} - -func (c *ScalersCache) Close(ctx context.Context) { - scalers := c.Scalers - c.Scalers = nil - for _, s := range scalers { - err := s.Scaler.Close(ctx) - if err != nil { - log.Error(err, "error closing scaler", "scaler", s) - } - } -} - type scalerMetrics struct { queueLength float64 maxValue float64 isActive bool } -// TODO needs refactor func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics { // TODO this loop should be probably done similar way the ScaledObject loop is done var scalersMetrics []scalerMetrics diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 0d8905d475e..454606931b7 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -85,6 +85,11 @@ func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, recon } } +/// --------------------------------------------------------------------------- /// +/// ---------- Scaling logic related methods --------- /// +/// --------------------------------------------------------------------------- /// + +// HandleScalableObject is the initial method when Scalable is created and it handles the main scaling logic func (h *scaleHandler) HandleScalableObject(ctx context.Context, scalableObject interface{}) error { withTriggers, err := kedav1alpha1.AsDuckWithTriggers(scalableObject) if err != nil { @@ -122,6 +127,7 @@ func (h *scaleHandler) HandleScalableObject(ctx context.Context, scalableObject return nil } +// DeleteScalableObject stops handling logic for input ScalableObject func (h *scaleHandler) DeleteScalableObject(ctx context.Context, scalableObject interface{}) error { withTriggers, err := kedav1alpha1.AsDuckWithTriggers(scalableObject) if err != nil { @@ -149,7 +155,7 @@ func (h *scaleHandler) DeleteScalableObject(ctx context.Context, scalableObject return nil } -// startScaleLoop blocks forever and checks the scaledObject based on its pollingInterval +// startScaleLoop blocks forever and checks the scalableObject based on its pollingInterval func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}, scalingMutex sync.Locker) { logger := log.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) @@ -175,6 +181,84 @@ func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1a } } +// startPushScalers starts all push scalers defined in the input scalableOjbect +func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}, scalingMutex sync.Locker) { + logger := log.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) + cache, err := h.GetScalersCache(ctx, scalableObject) + if err != nil { + logger.Error(err, "Error getting scalers", "object", scalableObject) + return + } + + for _, ps := range cache.GetPushScalers() { + go func(s scalers.PushScaler) { + activeCh := make(chan bool) + go s.Run(ctx, activeCh) + for { + select { + case <-ctx.Done(): + return + case active := <-activeCh: + scalingMutex.Lock() + switch obj := scalableObject.(type) { + case *kedav1alpha1.ScaledObject: + h.scaleExecutor.RequestScale(ctx, obj, active, false) + case *kedav1alpha1.ScaledJob: + logger.Info("Warning: External Push Scaler does not support ScaledJob", "object", scalableObject) + } + scalingMutex.Unlock() + } + } + }(ps) + } +} + +// checkScalers contains the main logic for the ScaleHandler scaling logic. +// It'll check each trigger active status then call RequestScale +func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interface{}, scalingMutex sync.Locker) { + scalingMutex.Lock() + defer scalingMutex.Unlock() + switch obj := scalableObject.(type) { + case *kedav1alpha1.ScaledObject: + err := h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) + if err != nil { + log.Error(err, "error getting scaledObject", "object", scalableObject) + return + } + isActive, isError, metricsRecords, err := h.getScaledObjectState(ctx, obj) + if err != nil { + log.Error(err, "error getting state of scaledObject", "scaledObject.Namespace", obj.Namespace, "scaledObject.Name", obj.Name) + return + } + + h.scaleExecutor.RequestScale(ctx, obj, isActive, isError) + + if len(metricsRecords) > 0 { + log.V(1).Info("Storing metrics to cache", "scaledObject.Namespace", obj.Namespace, "scaledObject.Name", obj.Name, "metricsRecords", metricsRecords) + h.scaledObjectsMetricCache.StoreRecords(obj.GenerateIdentifier(), metricsRecords) + } + case *kedav1alpha1.ScaledJob: + cache, err := h.GetScalersCache(ctx, scalableObject) + if err != nil { + log.Error(err, "error getting scalers cache", "scaledJob.Namespace", obj.Namespace, "scaledJob.Name", obj.Name) + return + } + + err = h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) + if err != nil { + log.Error(err, "error getting scaledJob", "scaledJob.Namespace", obj.Namespace, "scaledJob.Name", obj.Name) + return + } + + isActive, scaleTo, maxScale := cache.IsScaledJobActive(ctx, obj) + h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale) + } +} + +/// --------------------------------------------------------------------------- /// +/// ---------- ScalersCache related methods --------- /// +/// --------------------------------------------------------------------------- /// + // GetScalersCache returns cache for input scalableObject, if the object is not found in the cache, it returns a new one // if the input object is ScaledObject, it also compares the Generation of the input of object with the one stored in the cache, // this is needed for out of scalerLoop invocations of this method (in package `controllers/keda`). @@ -286,6 +370,7 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s return h.scalerCaches[key], nil } +// ClearScalersCache invalidates chache for the input scalableObject func (h *scaleHandler) ClearScalersCache(ctx context.Context, scalableObject interface{}) error { withTriggers, err := kedav1alpha1.AsDuckWithTriggers(scalableObject) if err != nil { @@ -307,71 +392,9 @@ func (h *scaleHandler) ClearScalersCache(ctx context.Context, scalableObject int return nil } -func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}, scalingMutex sync.Locker) { - logger := log.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) - cache, err := h.GetScalersCache(ctx, scalableObject) - if err != nil { - logger.Error(err, "Error getting scalers", "object", scalableObject) - return - } - - for _, ps := range cache.GetPushScalers() { - go func(s scalers.PushScaler) { - activeCh := make(chan bool) - go s.Run(ctx, activeCh) - for { - select { - case <-ctx.Done(): - return - case active := <-activeCh: - scalingMutex.Lock() - switch obj := scalableObject.(type) { - case *kedav1alpha1.ScaledObject: - h.scaleExecutor.RequestScale(ctx, obj, active, false) - case *kedav1alpha1.ScaledJob: - logger.Info("Warning: External Push Scaler does not support ScaledJob", "object", scalableObject) - } - scalingMutex.Unlock() - } - } - }(ps) - } -} - -// checkScalers contains the main logic for the ScaleHandler scaling logic. -// It'll check each trigger active status then call RequestScale -func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interface{}, scalingMutex sync.Locker) { - scalingMutex.Lock() - defer scalingMutex.Unlock() - switch obj := scalableObject.(type) { - case *kedav1alpha1.ScaledObject: - err := h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) - if err != nil { - log.Error(err, "Error getting scaledObject", "object", scalableObject) - return - } - isActive, isError, metricsRecords := h.getScaledObjectState(ctx, obj) - h.scaleExecutor.RequestScale(ctx, obj, isActive, isError) - if len(metricsRecords) > 0 { - log.V(1).Info("Storing metrics to cache", "scaledObject.Namespace", obj.Namespace, "scaledObject.Name", obj.Name, "metricsRecords", metricsRecords) - h.scaledObjectsMetricCache.StoreRecords(obj.GenerateIdentifier(), metricsRecords) - } - case *kedav1alpha1.ScaledJob: - cache, err := h.GetScalersCache(ctx, scalableObject) - if err != nil { - log.Error(err, "Error getting scalers", "object", scalableObject) - return - } - - err = h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) - if err != nil { - log.Error(err, "Error getting scaledJob", "object", scalableObject) - return - } - isActive, scaleTo, maxScale := cache.IsScaledJobActive(ctx, obj) - h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale) - } -} +/// --------------------------------------------------------------------------- /// +/// ---------- ScaledObject related methods --------- /// +/// --------------------------------------------------------------------------- /// // GetScaledObjectMetrics returns metrics for specified metric name for a ScaledObject identified by it's name and namespace. // The second return value are Prometheus metrics that needed to be exposed (used by DEPRECATED Prometheus Server on KEDA Metrics Server) @@ -394,7 +417,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN exportedPromMetrics.ScaledObjectErr = (err != nil) if err != nil { - return nil, &exportedPromMetrics, fmt.Errorf("error when getting scalers %w", err) + return nil, &exportedPromMetrics, fmt.Errorf("error getting scalers %w", err) } var scaledObject *kedav1alpha1.ScaledObject @@ -509,10 +532,12 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN }, &exportedPromMetrics, nil } -// getScaledObjectState returns whether the input ScaledObject is active as the first parameter, -// the second parameter indicates whether there was any error during quering scalers -// the third parameter returns map of metrics record - a metric value for each scaler and it's metric -func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (bool, bool, map[string]metricscache.MetricsRecord) { +// getScaledObjectState returns whether the input ScaledObject: +// is active as the first return value, +// the second return value indicates whether there was any error during quering scalers, +// the third return value is a map of metrics record - a metric value for each scaler and it's metric +// the fourth return value contains error if is not able access scalers cache +func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (bool, bool, map[string]metricscache.MetricsRecord, error) { logger := log.WithValues("scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name) isScaledObjectActive := false @@ -521,6 +546,9 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k cache, err := h.GetScalersCache(ctx, scaledObject) prommetrics.RecordScaledObjectError(scaledObject.Namespace, scaledObject.Name, err) + if err != nil { + return false, true, map[string]metricscache.MetricsRecord{}, fmt.Errorf("error getting scalers cache %w", err) + } // Let's collect status of all scalers, no matter if any scaler raises error or is active scalers, scalerConfigs := cache.GetScalers() @@ -596,201 +624,5 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k logger.V(1).Info("scaler error encountered, clearing scaler cache") } - return isScaledObjectActive, isScalerError, metricsRecord -} - -// buildScalers returns list of Scalers for the specified triggers -func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, podTemplateSpec *corev1.PodTemplateSpec, containerName string) ([]cache.ScalerBuilder, error) { - logger := log.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) - var err error - resolvedEnv := make(map[string]string) - result := make([]cache.ScalerBuilder, 0, len(withTriggers.Spec.Triggers)) - - for i, t := range withTriggers.Spec.Triggers { - triggerIndex, trigger := i, t - - factory := func() (scalers.Scaler, *scalers.ScalerConfig, error) { - if podTemplateSpec != nil { - resolvedEnv, err = resolver.ResolveContainerEnv(ctx, h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace, h.secretsLister) - if err != nil { - return nil, nil, fmt.Errorf("error resolving secrets for ScaleTarget: %w", err) - } - } - config := &scalers.ScalerConfig{ - ScalableObjectName: withTriggers.Name, - ScalableObjectNamespace: withTriggers.Namespace, - ScalableObjectType: withTriggers.Kind, - TriggerName: trigger.Name, - TriggerMetadata: trigger.Metadata, - TriggerUseCachedMetrics: trigger.UseCachedMetrics, - ResolvedEnv: resolvedEnv, - AuthParams: make(map[string]string), - GlobalHTTPTimeout: h.globalHTTPTimeout, - ScalerIndex: triggerIndex, - MetricType: trigger.MetricType, - } - - authParams, podIdentity, err := resolver.ResolveAuthRefAndPodIdentity(ctx, h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace, h.secretsLister) - if err != nil { - return nil, nil, err - } - config.AuthParams = authParams - config.PodIdentity = podIdentity - scaler, err := buildScaler(ctx, h.client, trigger.Type, config) - return scaler, config, err - } - - scaler, config, err := factory() - if err != nil { - h.recorder.Event(withTriggers, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - logger.Error(err, "error resolving auth params", "scalerIndex", triggerIndex) - if scaler != nil { - scaler.Close(ctx) - } - for _, builder := range result { - builder.Scaler.Close(ctx) - } - return nil, err - } - - result = append(result, cache.ScalerBuilder{ - Scaler: scaler, - ScalerConfig: *config, - Factory: factory, - }) - } - - return result, nil -} - -func buildScaler(ctx context.Context, client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) { - // TRIGGERS-START - switch triggerType { - case "activemq": - return scalers.NewActiveMQScaler(config) - case "arangodb": - return scalers.NewArangoDBScaler(config) - case "artemis-queue": - return scalers.NewArtemisQueueScaler(config) - case "aws-cloudwatch": - return scalers.NewAwsCloudwatchScaler(config) - case "aws-dynamodb": - return scalers.NewAwsDynamoDBScaler(config) - case "aws-dynamodb-streams": - return scalers.NewAwsDynamoDBStreamsScaler(ctx, config) - case "aws-kinesis-stream": - return scalers.NewAwsKinesisStreamScaler(config) - case "aws-sqs-queue": - return scalers.NewAwsSqsQueueScaler(config) - case "azure-app-insights": - return scalers.NewAzureAppInsightsScaler(config) - case "azure-blob": - return scalers.NewAzureBlobScaler(config) - case "azure-data-explorer": - return scalers.NewAzureDataExplorerScaler(ctx, config) - case "azure-eventhub": - return scalers.NewAzureEventHubScaler(ctx, config) - case "azure-log-analytics": - return scalers.NewAzureLogAnalyticsScaler(config) - case "azure-monitor": - return scalers.NewAzureMonitorScaler(config) - case "azure-pipelines": - return scalers.NewAzurePipelinesScaler(ctx, config) - case "azure-queue": - return scalers.NewAzureQueueScaler(config) - case "azure-servicebus": - return scalers.NewAzureServiceBusScaler(ctx, config) - case "cassandra": - return scalers.NewCassandraScaler(config) - case "couchdb": - return scalers.NewCouchDBScaler(ctx, config) - case "cpu": - return scalers.NewCPUMemoryScaler(corev1.ResourceCPU, config) - case "cron": - return scalers.NewCronScaler(config) - case "datadog": - return scalers.NewDatadogScaler(ctx, config) - case "elasticsearch": - return scalers.NewElasticsearchScaler(config) - case "etcd": - return scalers.NewEtcdScaler(config) - case "external": - return scalers.NewExternalScaler(config) - // TODO: use other way for test. - case "external-mock": - return scalers.NewExternalMockScaler(config) - case "external-push": - return scalers.NewExternalPushScaler(config) - case "gcp-pubsub": - return scalers.NewPubSubScaler(config) - case "gcp-stackdriver": - return scalers.NewStackdriverScaler(ctx, config) - case "gcp-storage": - return scalers.NewGcsScaler(config) - case "graphite": - return scalers.NewGraphiteScaler(config) - case "huawei-cloudeye": - return scalers.NewHuaweiCloudeyeScaler(config) - case "ibmmq": - return scalers.NewIBMMQScaler(config) - case "influxdb": - return scalers.NewInfluxDBScaler(config) - case "kafka": - return scalers.NewKafkaScaler(config) - case "kubernetes-workload": - return scalers.NewKubernetesWorkloadScaler(client, config) - case "liiklus": - return scalers.NewLiiklusScaler(config) - case "loki": - return scalers.NewLokiScaler(config) - case "memory": - return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config) - case "metrics-api": - return scalers.NewMetricsAPIScaler(config) - case "mongodb": - return scalers.NewMongoDBScaler(ctx, config) - case "mssql": - return scalers.NewMSSQLScaler(config) - case "mysql": - return scalers.NewMySQLScaler(config) - case "nats-jetstream": - return scalers.NewNATSJetStreamScaler(config) - case "new-relic": - return scalers.NewNewRelicScaler(config) - case "openstack-metric": - return scalers.NewOpenstackMetricScaler(ctx, config) - case "openstack-swift": - return scalers.NewOpenstackSwiftScaler(ctx, config) - case "postgresql": - return scalers.NewPostgreSQLScaler(config) - case "predictkube": - return scalers.NewPredictKubeScaler(ctx, config) - case "prometheus": - return scalers.NewPrometheusScaler(config) - case "pulsar": - return scalers.NewPulsarScaler(config) - case "rabbitmq": - return scalers.NewRabbitMQScaler(config) - case "redis": - return scalers.NewRedisScaler(ctx, false, false, config) - case "redis-cluster": - return scalers.NewRedisScaler(ctx, true, false, config) - case "redis-cluster-streams": - return scalers.NewRedisStreamsScaler(ctx, true, false, config) - case "redis-sentinel": - return scalers.NewRedisScaler(ctx, false, true, config) - case "redis-sentinel-streams": - return scalers.NewRedisStreamsScaler(ctx, false, true, config) - case "redis-streams": - return scalers.NewRedisStreamsScaler(ctx, false, false, config) - case "selenium-grid": - return scalers.NewSeleniumGridScaler(config) - case "solace-event-queue": - return scalers.NewSolaceScaler(config) - case "stan": - return scalers.NewStanScaler(config) - default: - return nil, fmt.Errorf("no scaler found for type: %s", triggerType) - } - // TRIGGERS-END + return isScaledObjectActive, isScalerError, metricsRecord, nil } diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index 7137011846a..79b2ac155ee 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -277,7 +277,7 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) { scaledObjectsMetricCache: metricscache.NewMetricsCache(), } - isActive, isError, _ := sh.getScaledObjectState(context.TODO(), &scaledObject) + isActive, isError, _, _ := sh.getScaledObjectState(context.TODO(), &scaledObject) scalerCache.Close(context.Background()) assert.Equal(t, false, isActive) @@ -352,7 +352,7 @@ func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) { scaledObjectsMetricCache: metricscache.NewMetricsCache(), } - isActive, isError, _ := sh.getScaledObjectState(context.TODO(), &scaledObject) + isActive, isError, _, _ := sh.getScaledObjectState(context.TODO(), &scaledObject) scalerCache.Close(context.Background()) assert.Equal(t, true, isActive) diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go new file mode 100644 index 00000000000..9967e96dc0b --- /dev/null +++ b/pkg/scaling/scalers_builder.go @@ -0,0 +1,232 @@ +/* +Copyright 2023 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaling + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/pkg/eventreason" + "github.com/kedacore/keda/v2/pkg/scalers" + "github.com/kedacore/keda/v2/pkg/scaling/cache" + "github.com/kedacore/keda/v2/pkg/scaling/resolver" +) + +/// --------------------------------------------------------------------------- /// +/// ---------- Scaler-Building related methods --------- /// +/// --------------------------------------------------------------------------- /// + +// buildScalers returns list of Scalers for the specified triggers +func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, podTemplateSpec *corev1.PodTemplateSpec, containerName string) ([]cache.ScalerBuilder, error) { + logger := log.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) + var err error + resolvedEnv := make(map[string]string) + result := make([]cache.ScalerBuilder, 0, len(withTriggers.Spec.Triggers)) + + for i, t := range withTriggers.Spec.Triggers { + triggerIndex, trigger := i, t + + factory := func() (scalers.Scaler, *scalers.ScalerConfig, error) { + if podTemplateSpec != nil { + resolvedEnv, err = resolver.ResolveContainerEnv(ctx, h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace, h.secretsLister) + if err != nil { + return nil, nil, fmt.Errorf("error resolving secrets for ScaleTarget: %w", err) + } + } + config := &scalers.ScalerConfig{ + ScalableObjectName: withTriggers.Name, + ScalableObjectNamespace: withTriggers.Namespace, + ScalableObjectType: withTriggers.Kind, + TriggerName: trigger.Name, + TriggerMetadata: trigger.Metadata, + TriggerUseCachedMetrics: trigger.UseCachedMetrics, + ResolvedEnv: resolvedEnv, + AuthParams: make(map[string]string), + GlobalHTTPTimeout: h.globalHTTPTimeout, + ScalerIndex: triggerIndex, + MetricType: trigger.MetricType, + } + + authParams, podIdentity, err := resolver.ResolveAuthRefAndPodIdentity(ctx, h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace, h.secretsLister) + if err != nil { + return nil, nil, err + } + config.AuthParams = authParams + config.PodIdentity = podIdentity + scaler, err := buildScaler(ctx, h.client, trigger.Type, config) + return scaler, config, err + } + + scaler, config, err := factory() + if err != nil { + h.recorder.Event(withTriggers, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + logger.Error(err, "error resolving auth params", "scalerIndex", triggerIndex) + if scaler != nil { + scaler.Close(ctx) + } + for _, builder := range result { + builder.Scaler.Close(ctx) + } + return nil, err + } + + result = append(result, cache.ScalerBuilder{ + Scaler: scaler, + ScalerConfig: *config, + Factory: factory, + }) + } + + return result, nil +} + +// buildScaler builds a scaler form input config and trigger type +func buildScaler(ctx context.Context, client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) { + // TRIGGERS-START + switch triggerType { + case "activemq": + return scalers.NewActiveMQScaler(config) + case "arangodb": + return scalers.NewArangoDBScaler(config) + case "artemis-queue": + return scalers.NewArtemisQueueScaler(config) + case "aws-cloudwatch": + return scalers.NewAwsCloudwatchScaler(config) + case "aws-dynamodb": + return scalers.NewAwsDynamoDBScaler(config) + case "aws-dynamodb-streams": + return scalers.NewAwsDynamoDBStreamsScaler(ctx, config) + case "aws-kinesis-stream": + return scalers.NewAwsKinesisStreamScaler(config) + case "aws-sqs-queue": + return scalers.NewAwsSqsQueueScaler(config) + case "azure-app-insights": + return scalers.NewAzureAppInsightsScaler(config) + case "azure-blob": + return scalers.NewAzureBlobScaler(config) + case "azure-data-explorer": + return scalers.NewAzureDataExplorerScaler(ctx, config) + case "azure-eventhub": + return scalers.NewAzureEventHubScaler(ctx, config) + case "azure-log-analytics": + return scalers.NewAzureLogAnalyticsScaler(config) + case "azure-monitor": + return scalers.NewAzureMonitorScaler(config) + case "azure-pipelines": + return scalers.NewAzurePipelinesScaler(ctx, config) + case "azure-queue": + return scalers.NewAzureQueueScaler(config) + case "azure-servicebus": + return scalers.NewAzureServiceBusScaler(ctx, config) + case "cassandra": + return scalers.NewCassandraScaler(config) + case "couchdb": + return scalers.NewCouchDBScaler(ctx, config) + case "cpu": + return scalers.NewCPUMemoryScaler(corev1.ResourceCPU, config) + case "cron": + return scalers.NewCronScaler(config) + case "datadog": + return scalers.NewDatadogScaler(ctx, config) + case "elasticsearch": + return scalers.NewElasticsearchScaler(config) + case "etcd": + return scalers.NewEtcdScaler(config) + case "external": + return scalers.NewExternalScaler(config) + // TODO: use other way for test. + case "external-mock": + return scalers.NewExternalMockScaler(config) + case "external-push": + return scalers.NewExternalPushScaler(config) + case "gcp-pubsub": + return scalers.NewPubSubScaler(config) + case "gcp-stackdriver": + return scalers.NewStackdriverScaler(ctx, config) + case "gcp-storage": + return scalers.NewGcsScaler(config) + case "graphite": + return scalers.NewGraphiteScaler(config) + case "huawei-cloudeye": + return scalers.NewHuaweiCloudeyeScaler(config) + case "ibmmq": + return scalers.NewIBMMQScaler(config) + case "influxdb": + return scalers.NewInfluxDBScaler(config) + case "kafka": + return scalers.NewKafkaScaler(config) + case "kubernetes-workload": + return scalers.NewKubernetesWorkloadScaler(client, config) + case "liiklus": + return scalers.NewLiiklusScaler(config) + case "loki": + return scalers.NewLokiScaler(config) + case "memory": + return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config) + case "metrics-api": + return scalers.NewMetricsAPIScaler(config) + case "mongodb": + return scalers.NewMongoDBScaler(ctx, config) + case "mssql": + return scalers.NewMSSQLScaler(config) + case "mysql": + return scalers.NewMySQLScaler(config) + case "nats-jetstream": + return scalers.NewNATSJetStreamScaler(config) + case "new-relic": + return scalers.NewNewRelicScaler(config) + case "openstack-metric": + return scalers.NewOpenstackMetricScaler(ctx, config) + case "openstack-swift": + return scalers.NewOpenstackSwiftScaler(ctx, config) + case "postgresql": + return scalers.NewPostgreSQLScaler(config) + case "predictkube": + return scalers.NewPredictKubeScaler(ctx, config) + case "prometheus": + return scalers.NewPrometheusScaler(config) + case "pulsar": + return scalers.NewPulsarScaler(config) + case "rabbitmq": + return scalers.NewRabbitMQScaler(config) + case "redis": + return scalers.NewRedisScaler(ctx, false, false, config) + case "redis-cluster": + return scalers.NewRedisScaler(ctx, true, false, config) + case "redis-cluster-streams": + return scalers.NewRedisStreamsScaler(ctx, true, false, config) + case "redis-sentinel": + return scalers.NewRedisScaler(ctx, false, true, config) + case "redis-sentinel-streams": + return scalers.NewRedisStreamsScaler(ctx, false, true, config) + case "redis-streams": + return scalers.NewRedisStreamsScaler(ctx, false, false, config) + case "selenium-grid": + return scalers.NewSeleniumGridScaler(config) + case "solace-event-queue": + return scalers.NewSolaceScaler(config) + case "stan": + return scalers.NewStanScaler(config) + default: + return nil, fmt.Errorf("no scaler found for type: %s", triggerType) + } + // TRIGGERS-END +} diff --git a/tools/sort_scalers.sh b/tools/sort_scalers.sh index 2daefbafcfc..7ab215d4bfb 100644 --- a/tools/sort_scalers.sh +++ b/tools/sort_scalers.sh @@ -4,7 +4,7 @@ set -euo pipefail LEAD='TRIGGERS-START' TAIL='TRIGGERS-END' -SCALERS_FILE="pkg/scaling/scale_handler.go" +SCALERS_FILE="pkg/scaling/scalers_builder.go" CURRENT=$(cat "${SCALERS_FILE}" | awk "/${LEAD}/,/${TAIL}/" | grep "case") SORTED=$(cat "${SCALERS_FILE}" | awk "/${LEAD}/,/${TAIL}/" | grep "case" | sort)