diff --git a/CHANGELOG.md b/CHANGELOG.md index 83d867f7afb..96175c0d25c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -95,6 +95,7 @@ New deprecation(s): - **General**: Bump Golang to 1.19 ([#4094](https://github.com/kedacore/keda/issues/4094)) - **General**: Check that ScaledObject name is specified as part of a query for getting metrics ([#4088](https://github.com/kedacore/keda/pull/4088)) - **General**: Compare error with `errors.Is` ([#4004](https://github.com/kedacore/keda/pull/4004)) +- **General**: Consolidate `GetMetrics` and `IsActive` to `GetMetricsAndActivity` for Azure Event Hub, Cron and External scalers ([#4015](https://github.com/kedacore/keda/issues/4015)) - **General**: Improve test coverage in `pkg/util` ([#3871](https://github.com/kedacore/keda/issues/3871)) - **General**: Pass deep copy object to scalers cache from the ScaledObject controller ([#4207](https://github.com/kedacore/keda/issues/4207)) - **General**: Review CodeQL rules and enable it on PRs ([#4032](https://github.com/kedacore/keda/pull/4032)) diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 88b206232f2..41503f7ba17 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -327,38 +327,6 @@ func GetUnprocessedEventCountWithoutCheckpoint(partitionInfo *eventhub.HubPartit return 0 } -// IsActive determines if eventhub is active based on number of unprocessed events -func (s *azureEventHubScaler) IsActive(ctx context.Context) (bool, error) { - runtimeInfo, err := s.client.GetRuntimeInformation(ctx) - if err != nil { - s.logger.Error(err, "unable to get runtimeInfo for isActive") - return false, fmt.Errorf("unable to get runtimeInfo for isActive: %w", err) - } - - partitionIDs := runtimeInfo.PartitionIDs - - for i := 0; i < len(partitionIDs); i++ { - partitionID := partitionIDs[i] - - partitionRuntimeInfo, err := s.client.GetPartitionInformation(ctx, partitionID) - if err != nil { - return false, fmt.Errorf("unable to get partitionRuntimeInfo for metrics: %w", err) - } - - unprocessedEventCount, _, err := s.GetUnprocessedEventCountInPartition(ctx, partitionRuntimeInfo) - - if err != nil { - return false, fmt.Errorf("unable to get unprocessedEventCount for isActive: %w", err) - } - - if unprocessedEventCount > s.metadata.activationThreshold { - return true, nil - } - } - - return false, nil -} - // GetMetricSpecForScaling returns metric spec func (s *azureEventHubScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ @@ -371,12 +339,33 @@ func (s *azureEventHubScaler) GetMetricSpecForScaling(context.Context) []v2.Metr return []v2.MetricSpec{metricSpec} } -// GetMetrics returns metric using total number of unprocessed events in event hub -func (s *azureEventHubScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) { +func getTotalLagRelatedToPartitionAmount(unprocessedEventsCount int64, partitionCount int64, threshold int64) int64 { + if (unprocessedEventsCount / threshold) > partitionCount { + return partitionCount * threshold + } + + return unprocessedEventsCount +} + +// Close closes Azure Event Hub Scaler +func (s *azureEventHubScaler) Close(ctx context.Context) error { + if s.client != nil { + err := s.client.Close(ctx) + if err != nil { + s.logger.Error(err, "error closing azure event hub client") + return err + } + } + + return nil +} + +// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric +func (s *azureEventHubScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { totalUnprocessedEventCount := int64(0) runtimeInfo, err := s.client.GetRuntimeInformation(ctx) if err != nil { - return []external_metrics.ExternalMetricValue{}, fmt.Errorf("unable to get runtimeInfo for metrics: %w", err) + return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to get runtimeInfo for metrics: %w", err) } partitionIDs := runtimeInfo.PartitionIDs @@ -385,14 +374,14 @@ func (s *azureEventHubScaler) GetMetrics(ctx context.Context, metricName string) partitionID := partitionIDs[i] partitionRuntimeInfo, err := s.client.GetPartitionInformation(ctx, partitionID) if err != nil { - return []external_metrics.ExternalMetricValue{}, fmt.Errorf("unable to get partitionRuntimeInfo for metrics: %w", err) + return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to get partitionRuntimeInfo for metrics: %w", err) } unprocessedEventCount := int64(0) unprocessedEventCount, checkpoint, err := s.GetUnprocessedEventCountInPartition(ctx, partitionRuntimeInfo) if err != nil { - return []external_metrics.ExternalMetricValue{}, fmt.Errorf("unable to get unprocessedEventCount for metrics: %w", err) + return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to get unprocessedEventCount for metrics: %w", err) } totalUnprocessedEventCount += unprocessedEventCount @@ -408,43 +397,5 @@ func (s *azureEventHubScaler) GetMetrics(ctx context.Context, metricName string) metric := GenerateMetricInMili(metricName, float64(lagRelatedToPartitionCount)) - return append([]external_metrics.ExternalMetricValue{}, metric), nil -} - -func getTotalLagRelatedToPartitionAmount(unprocessedEventsCount int64, partitionCount int64, threshold int64) int64 { - if (unprocessedEventsCount / threshold) > partitionCount { - return partitionCount * threshold - } - - return unprocessedEventsCount -} - -// Close closes Azure Event Hub Scaler -func (s *azureEventHubScaler) Close(ctx context.Context) error { - if s.client != nil { - err := s.client.Close(ctx) - if err != nil { - s.logger.Error(err, "error closing azure event hub client") - return err - } - } - - return nil -} - -// TODO merge isActive() and GetMetrics() -func (s *azureEventHubScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - metrics, err := s.GetMetrics(ctx, metricName) - if err != nil { - s.logger.Error(err, "error getting metrics") - return []external_metrics.ExternalMetricValue{}, false, err - } - - isActive, err := s.IsActive(ctx) - if err != nil { - s.logger.Error(err, "error getting activity status") - return []external_metrics.ExternalMetricValue{}, false, err - } - - return metrics, isActive, nil + return []external_metrics.ExternalMetricValue{metric}, totalUnprocessedEventCount > s.metadata.activationThreshold, nil } diff --git a/pkg/scalers/cron_scaler.go b/pkg/scalers/cron_scaler.go index b8f3542fa60..9a0cbc5e1c6 100644 --- a/pkg/scalers/cron_scaler.go +++ b/pkg/scalers/cron_scaler.go @@ -114,36 +114,6 @@ func parseCronMetadata(config *ScalerConfig) (*cronMetadata, error) { return &meta, nil } -// IsActive checks if the startTime or endTime has reached -func (s *cronScaler) IsActive(ctx context.Context) (bool, error) { - location, err := time.LoadLocation(s.metadata.timezone) - if err != nil { - return false, fmt.Errorf("unable to load timezone. Error: %w", err) - } - - // Since we are considering the timestamp here and not the exact time, timezone does matter. - currentTime := time.Now().Unix() - - nextStartTime, startTimecronErr := getCronTime(location, s.metadata.start) - if startTimecronErr != nil { - return false, fmt.Errorf("error initializing start cron: %w", startTimecronErr) - } - - nextEndTime, endTimecronErr := getCronTime(location, s.metadata.end) - if endTimecronErr != nil { - return false, fmt.Errorf("error intializing end cron: %w", endTimecronErr) - } - - switch { - case nextStartTime < nextEndTime && currentTime < nextStartTime: - return false, nil - case currentTime <= nextEndTime: - return true, nil - default: - return false, nil - } -} - func (s *cronScaler) Close(context.Context) error { return nil } @@ -169,37 +139,37 @@ func (s *cronScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { return []v2.MetricSpec{metricSpec} } -// GetMetrics finds the current value of the metric -func (s *cronScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) { - var currentReplicas = int64(defaultDesiredReplicas) - isActive, err := s.IsActive(ctx) +// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric +func (s *cronScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + var defaultDesiredReplicas = int64(defaultDesiredReplicas) + + location, err := time.LoadLocation(s.metadata.timezone) if err != nil { - s.logger.Error(err, "error") - return []external_metrics.ExternalMetricValue{}, err - } - if isActive { - currentReplicas = s.metadata.desiredReplicas + return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to load timezone. Error: %w", err) } - /*******************************************************************************/ - metric := GenerateMetricInMili(metricName, float64(currentReplicas)) - - return []external_metrics.ExternalMetricValue{metric}, nil -} + // Since we are considering the timestamp here and not the exact time, timezone does matter. + currentTime := time.Now().Unix() -// TODO merge isActive() and GetMetrics() -func (s *cronScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - metrics, err := s.GetMetrics(ctx, metricName) - if err != nil { - s.logger.Error(err, "error getting metrics") - return []external_metrics.ExternalMetricValue{}, false, err + nextStartTime, startTimecronErr := getCronTime(location, s.metadata.start) + if startTimecronErr != nil { + return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error initializing start cron: %w", startTimecronErr) } - isActive, err := s.IsActive(ctx) - if err != nil { - s.logger.Error(err, "error getting activity status") - return []external_metrics.ExternalMetricValue{}, false, err + nextEndTime, endTimecronErr := getCronTime(location, s.metadata.end) + if endTimecronErr != nil { + return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error intializing end cron: %w", endTimecronErr) } - return metrics, isActive, nil + switch { + case nextStartTime < nextEndTime && currentTime < nextStartTime: + metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas)) + return []external_metrics.ExternalMetricValue{metric}, false, nil + case currentTime <= nextEndTime: + metric := GenerateMetricInMili(metricName, float64(s.metadata.desiredReplicas)) + return []external_metrics.ExternalMetricValue{metric}, true, nil + default: + metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas)) + return []external_metrics.ExternalMetricValue{metric}, false, nil + } } diff --git a/pkg/scalers/external_scaler.go b/pkg/scalers/external_scaler.go index d6a0a452cd9..4e1b788958b 100644 --- a/pkg/scalers/external_scaler.go +++ b/pkg/scalers/external_scaler.go @@ -128,22 +128,6 @@ func parseExternalScalerMetadata(config *ScalerConfig) (externalScalerMetadata, return meta, nil } -// IsActive checks if there are any messages in the subscription -func (s *externalScaler) IsActive(ctx context.Context) (bool, error) { - grpcClient, err := getClientForConnectionPool(s.metadata) - if err != nil { - return false, err - } - - response, err := grpcClient.IsActive(ctx, &s.scaledObjectRef) - if err != nil { - s.logger.Error(err, "error calling IsActive on external scaler") - return false, err - } - - return response.Result, nil -} - func (s *externalScaler) Close(context.Context) error { return nil } @@ -184,18 +168,18 @@ func (s *externalScaler) GetMetricSpecForScaling(ctx context.Context) []v2.Metri return result } -// GetMetrics connects calls the gRPC interface to get the metrics with a specific name -func (s *externalScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) { +// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric +func (s *externalScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { var metrics []external_metrics.ExternalMetricValue grpcClient, err := getClientForConnectionPool(s.metadata) if err != nil { - return metrics, err + return []external_metrics.ExternalMetricValue{}, false, err } // Remove the sX- prefix as the external scaler shouldn't have to know about it metricNameWithoutIndex, err := RemoveIndexFromMetricName(s.metadata.scalerIndex, metricName) if err != nil { - return metrics, err + return []external_metrics.ExternalMetricValue{}, false, err } request := &pb.GetMetricsRequest{ @@ -203,35 +187,24 @@ func (s *externalScaler) GetMetrics(ctx context.Context, metricName string) ([]e ScaledObjectRef: &s.scaledObjectRef, } - response, err := grpcClient.GetMetrics(ctx, request) + metricsResponse, err := grpcClient.GetMetrics(ctx, request) if err != nil { s.logger.Error(err, "error") - return []external_metrics.ExternalMetricValue{}, err + return []external_metrics.ExternalMetricValue{}, false, err } - for _, metricResult := range response.MetricValues { + for _, metricResult := range metricsResponse.MetricValues { metric := GenerateMetricInMili(metricName, float64(metricResult.MetricValue)) metrics = append(metrics, metric) } - return metrics, nil -} - -// TODO merge isActive() and GetMetrics() -func (s *externalScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - metrics, err := s.GetMetrics(ctx, metricName) + isActiveResponse, err := grpcClient.IsActive(ctx, &s.scaledObjectRef) if err != nil { - s.logger.Error(err, "error getting metrics") - return []external_metrics.ExternalMetricValue{}, false, err - } - - isActive, err := s.IsActive(ctx) - if err != nil { - s.logger.Error(err, "error getting activity status") + s.logger.Error(err, "error calling IsActive on external scaler") return []external_metrics.ExternalMetricValue{}, false, err } - return metrics, isActive, nil + return metrics, isActiveResponse.Result, nil } // handleIsActiveStream is the only writer to the active channel and will close it on return.