Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate GetMetrics() and IsActive() to GetMetricsAndActivity() #4238

Merged
merged 5 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ New deprecation(s):
- **General**: Bump Golang to 1.19 ([#4094](https://github.com/kedacore/keda/issues/4094))
- **General**: Check that ScaledObject name is specified as part of a query for getting metrics ([#4088](https://github.com/kedacore/keda/pull/4088))
- **General**: Compare error with `errors.Is` ([#4004](https://github.com/kedacore/keda/pull/4004))
- **General**: Consolidate `GetMetrics` and `IsActive` to `GetMetricsAndActivity` for Azure Event Hub, Cron and External scalers ([#4015](https://github.com/kedacore/keda/issues/4015))
- **General**: Improve test coverage in `pkg/util` ([#3871](https://github.com/kedacore/keda/issues/3871))
- **General**: Pass deep copy object to scalers cache from the ScaledObject controller ([#4207](https://github.com/kedacore/keda/issues/4207))
- **General**: Review CodeQL rules and enable it on PRs ([#4032](https://github.com/kedacore/keda/pull/4032))
Expand Down
103 changes: 27 additions & 76 deletions pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,38 +327,6 @@ func GetUnprocessedEventCountWithoutCheckpoint(partitionInfo *eventhub.HubPartit
return 0
}

// IsActive determines if eventhub is active based on number of unprocessed events
func (s *azureEventHubScaler) IsActive(ctx context.Context) (bool, error) {
runtimeInfo, err := s.client.GetRuntimeInformation(ctx)
if err != nil {
s.logger.Error(err, "unable to get runtimeInfo for isActive")
return false, fmt.Errorf("unable to get runtimeInfo for isActive: %w", err)
}

partitionIDs := runtimeInfo.PartitionIDs

for i := 0; i < len(partitionIDs); i++ {
partitionID := partitionIDs[i]

partitionRuntimeInfo, err := s.client.GetPartitionInformation(ctx, partitionID)
if err != nil {
return false, fmt.Errorf("unable to get partitionRuntimeInfo for metrics: %w", err)
}

unprocessedEventCount, _, err := s.GetUnprocessedEventCountInPartition(ctx, partitionRuntimeInfo)

if err != nil {
return false, fmt.Errorf("unable to get unprocessedEventCount for isActive: %w", err)
}

if unprocessedEventCount > s.metadata.activationThreshold {
return true, nil
}
}

return false, nil
}

// GetMetricSpecForScaling returns metric spec
func (s *azureEventHubScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Expand All @@ -371,12 +339,33 @@ func (s *azureEventHubScaler) GetMetricSpecForScaling(context.Context) []v2.Metr
return []v2.MetricSpec{metricSpec}
}

// GetMetrics returns metric using total number of unprocessed events in event hub
func (s *azureEventHubScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) {
func getTotalLagRelatedToPartitionAmount(unprocessedEventsCount int64, partitionCount int64, threshold int64) int64 {
if (unprocessedEventsCount / threshold) > partitionCount {
return partitionCount * threshold
}

return unprocessedEventsCount
}

// Close closes Azure Event Hub Scaler
func (s *azureEventHubScaler) Close(ctx context.Context) error {
if s.client != nil {
err := s.client.Close(ctx)
if err != nil {
s.logger.Error(err, "error closing azure event hub client")
return err
}
}

return nil
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *azureEventHubScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
totalUnprocessedEventCount := int64(0)
runtimeInfo, err := s.client.GetRuntimeInformation(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("unable to get runtimeInfo for metrics: %w", err)
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to get runtimeInfo for metrics: %w", err)
}

partitionIDs := runtimeInfo.PartitionIDs
Expand All @@ -385,14 +374,14 @@ func (s *azureEventHubScaler) GetMetrics(ctx context.Context, metricName string)
partitionID := partitionIDs[i]
partitionRuntimeInfo, err := s.client.GetPartitionInformation(ctx, partitionID)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("unable to get partitionRuntimeInfo for metrics: %w", err)
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to get partitionRuntimeInfo for metrics: %w", err)
}

unprocessedEventCount := int64(0)

unprocessedEventCount, checkpoint, err := s.GetUnprocessedEventCountInPartition(ctx, partitionRuntimeInfo)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("unable to get unprocessedEventCount for metrics: %w", err)
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to get unprocessedEventCount for metrics: %w", err)
}

totalUnprocessedEventCount += unprocessedEventCount
Expand All @@ -408,43 +397,5 @@ func (s *azureEventHubScaler) GetMetrics(ctx context.Context, metricName string)

metric := GenerateMetricInMili(metricName, float64(lagRelatedToPartitionCount))

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func getTotalLagRelatedToPartitionAmount(unprocessedEventsCount int64, partitionCount int64, threshold int64) int64 {
if (unprocessedEventsCount / threshold) > partitionCount {
return partitionCount * threshold
}

return unprocessedEventsCount
}

// Close closes Azure Event Hub Scaler
func (s *azureEventHubScaler) Close(ctx context.Context) error {
if s.client != nil {
err := s.client.Close(ctx)
if err != nil {
s.logger.Error(err, "error closing azure event hub client")
return err
}
}

return nil
}

// TODO merge isActive() and GetMetrics()
func (s *azureEventHubScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
metrics, err := s.GetMetrics(ctx, metricName)
if err != nil {
s.logger.Error(err, "error getting metrics")
return []external_metrics.ExternalMetricValue{}, false, err
}

isActive, err := s.IsActive(ctx)
if err != nil {
s.logger.Error(err, "error getting activity status")
return []external_metrics.ExternalMetricValue{}, false, err
}

return metrics, isActive, nil
return []external_metrics.ExternalMetricValue{metric}, totalUnprocessedEventCount > s.metadata.activationThreshold, nil
}
80 changes: 25 additions & 55 deletions pkg/scalers/cron_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,36 +114,6 @@ func parseCronMetadata(config *ScalerConfig) (*cronMetadata, error) {
return &meta, nil
}

// IsActive checks if the startTime or endTime has reached
func (s *cronScaler) IsActive(ctx context.Context) (bool, error) {
location, err := time.LoadLocation(s.metadata.timezone)
if err != nil {
return false, fmt.Errorf("unable to load timezone. Error: %w", err)
}

// Since we are considering the timestamp here and not the exact time, timezone does matter.
currentTime := time.Now().Unix()

nextStartTime, startTimecronErr := getCronTime(location, s.metadata.start)
if startTimecronErr != nil {
return false, fmt.Errorf("error initializing start cron: %w", startTimecronErr)
}

nextEndTime, endTimecronErr := getCronTime(location, s.metadata.end)
if endTimecronErr != nil {
return false, fmt.Errorf("error intializing end cron: %w", endTimecronErr)
}

switch {
case nextStartTime < nextEndTime && currentTime < nextStartTime:
return false, nil
case currentTime <= nextEndTime:
return true, nil
default:
return false, nil
}
}

func (s *cronScaler) Close(context.Context) error {
return nil
}
Expand All @@ -169,37 +139,37 @@ func (s *cronScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
return []v2.MetricSpec{metricSpec}
}

// GetMetrics finds the current value of the metric
func (s *cronScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) {
var currentReplicas = int64(defaultDesiredReplicas)
isActive, err := s.IsActive(ctx)
// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *cronScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
var defaultDesiredReplicas = int64(defaultDesiredReplicas)

location, err := time.LoadLocation(s.metadata.timezone)
if err != nil {
s.logger.Error(err, "error")
return []external_metrics.ExternalMetricValue{}, err
}
if isActive {
currentReplicas = s.metadata.desiredReplicas
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to load timezone. Error: %w", err)
}

/*******************************************************************************/
metric := GenerateMetricInMili(metricName, float64(currentReplicas))

return []external_metrics.ExternalMetricValue{metric}, nil
}
// Since we are considering the timestamp here and not the exact time, timezone does matter.
currentTime := time.Now().Unix()

// TODO merge isActive() and GetMetrics()
func (s *cronScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
metrics, err := s.GetMetrics(ctx, metricName)
if err != nil {
s.logger.Error(err, "error getting metrics")
return []external_metrics.ExternalMetricValue{}, false, err
nextStartTime, startTimecronErr := getCronTime(location, s.metadata.start)
if startTimecronErr != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error initializing start cron: %w", startTimecronErr)
}

isActive, err := s.IsActive(ctx)
if err != nil {
s.logger.Error(err, "error getting activity status")
return []external_metrics.ExternalMetricValue{}, false, err
nextEndTime, endTimecronErr := getCronTime(location, s.metadata.end)
if endTimecronErr != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error intializing end cron: %w", endTimecronErr)
}

return metrics, isActive, nil
switch {
case nextStartTime < nextEndTime && currentTime < nextStartTime:
metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, false, nil
case currentTime <= nextEndTime:
metric := GenerateMetricInMili(metricName, float64(s.metadata.desiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, true, nil
default:
metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, false, nil
}
}
47 changes: 10 additions & 37 deletions pkg/scalers/external_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,22 +128,6 @@ func parseExternalScalerMetadata(config *ScalerConfig) (externalScalerMetadata,
return meta, nil
}

// IsActive checks if there are any messages in the subscription
func (s *externalScaler) IsActive(ctx context.Context) (bool, error) {
grpcClient, err := getClientForConnectionPool(s.metadata)
if err != nil {
return false, err
}

response, err := grpcClient.IsActive(ctx, &s.scaledObjectRef)
if err != nil {
s.logger.Error(err, "error calling IsActive on external scaler")
return false, err
}

return response.Result, nil
}

func (s *externalScaler) Close(context.Context) error {
return nil
}
Expand Down Expand Up @@ -184,54 +168,43 @@ func (s *externalScaler) GetMetricSpecForScaling(ctx context.Context) []v2.Metri
return result
}

// GetMetrics connects calls the gRPC interface to get the metrics with a specific name
func (s *externalScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) {
// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *externalScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
var metrics []external_metrics.ExternalMetricValue
grpcClient, err := getClientForConnectionPool(s.metadata)
if err != nil {
return metrics, err
return []external_metrics.ExternalMetricValue{}, false, err
}

// Remove the sX- prefix as the external scaler shouldn't have to know about it
metricNameWithoutIndex, err := RemoveIndexFromMetricName(s.metadata.scalerIndex, metricName)
if err != nil {
return metrics, err
return []external_metrics.ExternalMetricValue{}, false, err
}

request := &pb.GetMetricsRequest{
MetricName: metricNameWithoutIndex,
ScaledObjectRef: &s.scaledObjectRef,
}

response, err := grpcClient.GetMetrics(ctx, request)
metricsResponse, err := grpcClient.GetMetrics(ctx, request)
if err != nil {
s.logger.Error(err, "error")
return []external_metrics.ExternalMetricValue{}, err
return []external_metrics.ExternalMetricValue{}, false, err
}

for _, metricResult := range response.MetricValues {
for _, metricResult := range metricsResponse.MetricValues {
metric := GenerateMetricInMili(metricName, float64(metricResult.MetricValue))
metrics = append(metrics, metric)
}

return metrics, nil
}

// TODO merge isActive() and GetMetrics()
func (s *externalScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
metrics, err := s.GetMetrics(ctx, metricName)
isActiveResponse, err := grpcClient.IsActive(ctx, &s.scaledObjectRef)
if err != nil {
s.logger.Error(err, "error getting metrics")
return []external_metrics.ExternalMetricValue{}, false, err
}

isActive, err := s.IsActive(ctx)
if err != nil {
s.logger.Error(err, "error getting activity status")
s.logger.Error(err, "error calling IsActive on external scaler")
return []external_metrics.ExternalMetricValue{}, false, err
}

return metrics, isActive, nil
return metrics, isActiveResponse.Result, nil
}

// handleIsActiveStream is the only writer to the active channel and will close it on return.
Expand Down