Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ScaledJob: introduce MultipleScalersCalculation #2016

Merged
merged 23 commits into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c0c4814
Fix bug of ScaledJob multiple triggers
TsuyoshiUshio Aug 5, 2021
16671c8
fix pre-commit issue
TsuyoshiUshio Aug 6, 2021
12dc683
Update logs on scale handler and add crds
TsuyoshiUshio Aug 9, 2021
2416bfa
update change log
TsuyoshiUshio Aug 9, 2021
96b9907
Add test case for exceeding MaxReplicaCount in sum
TsuyoshiUshio Aug 9, 2021
d273f13
fix pre-commit
TsuyoshiUshio Aug 9, 2021
12a06ae
Update pkg/scaling/scale_handler.go
TsuyoshiUshio Sep 4, 2021
a9d9476
Update pkg/scaling/scale_handler.go
TsuyoshiUshio Sep 4, 2021
c5e4cd0
revert version and refactor scaling logic
TsuyoshiUshio Sep 4, 2021
e604a68
Merge branch 'main' into tsushi/fixscaledjobmultitrigger
TsuyoshiUshio Sep 4, 2021
784bc7c
Change option name from MultipleScalersOption to MultipleScalersCalcu…
TsuyoshiUshio Sep 4, 2021
15e39d9
update changelog for multiple triggers section
TsuyoshiUshio Sep 8, 2021
72147d2
Error Handling
TsuyoshiUshio Sep 8, 2021
93568a9
Refactor logging
TsuyoshiUshio Sep 8, 2021
96116f8
remove scaledjob prometheus
TsuyoshiUshio Sep 8, 2021
d5b519c
Update pkg/scaling/scaledjob/scale_metrics.go
TsuyoshiUshio Sep 11, 2021
49c6eb4
Update pkg/scaling/scaledjob/scale_metrics.go
TsuyoshiUshio Sep 11, 2021
3821105
Update pkg/scaling/scaledjob/scale_metrics.go
TsuyoshiUshio Sep 11, 2021
faa30df
Update pkg/scaling/scaledjob/scale_metrics_test.go
TsuyoshiUshio Sep 11, 2021
4c3bf05
Update pkg/scaling/scaledjob/scale_metrics.go
TsuyoshiUshio Sep 11, 2021
87740cf
Update CHANGELOG.md
TsuyoshiUshio Sep 11, 2021
6b7e6da
Move section to new section
TsuyoshiUshio Sep 11, 2021
6dda3c9
fix pre-commit white space issue
TsuyoshiUshio Sep 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
- Fix READY and ACTIVE fields of ScaledJob to show status when we run `kubectl get sj` ([#1855](https://github.com/kedacore/keda/pull/1855))
- Show HashiCorp Vault Address when using `kubectl get ta` or `kubectl get cta` ([#1862](https://github.com/kedacore/keda/pull/1862))
- Don't panic when HashiCorp Vault path doesn't exist ([#1864](https://github.com/kedacore/keda/pull/1864))
- Fix the issue when Scaled Jobs has multiple triggers ([#2016](https://github.com/kedacore/keda/pull/2016))
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved

### Breaking Changes

Expand Down
2 changes: 2 additions & 0 deletions apis/keda/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type ScalingStrategy struct {
CustomScalingRunningJobPercentage string `json:"customScalingRunningJobPercentage,omitempty"`
// +optional
PendingPodConditions []string `json:"pendingPodConditions,omitempty"`
// +optional
MultipleScalersCalculation string `json:"multipleScalersCalculation,omitempty"`
}

func init() {
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7362,6 +7362,8 @@ spec:
type: integer
customScalingRunningJobPercentage:
type: string
multipleScalersCalculation:
type: string
pendingPodConditions:
items:
type: string
Expand Down
124 changes: 124 additions & 0 deletions pkg/metrics/scaledjob_prometheus_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package metrics
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved

import (
"log"
"net/http"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
scaledJobMetricLabels = []string{"namespace", "metric", "scaledJob", "scaler", "scalerIndex"}
scaledJobScalerErrorsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "keda_operator",
Subsystem: "scaler",
Name: "errors_total",
Help: "Total number of errors for all scalers",
},
[]string{},
)
scaledJobScalerMetricsValue = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "keda_operator",
Subsystem: "scaler",
Name: "metrics_value",
Help: "Metric Value used for ScaledJob",
},
scaledJobMetricLabels,
)
scaledJobScalerErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "keda_operator",
Subsystem: "scaler",
Name: "errors",
Help: "Number of scaler errors",
},
scaledJobMetricLabels,
)
scaledJobErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "keda_operator",
Subsystem: "scaled_job",
Name: "errors",
Help: "Number of scaled object errors",
},
[]string{"namespace", "scaledObject"},
)
)

// PrometheusMetricServer the type of MetricsServer
type ScaledJobPrometheusMetricServer struct{}

var scaledJobRegistry *prometheus.Registry

func init() {
scaledJobRegistry = prometheus.NewRegistry()
scaledJobRegistry.MustRegister(scaledJobScalerErrorsTotal)
scaledJobRegistry.MustRegister(scaledJobScalerMetricsValue)
scaledJobRegistry.MustRegister(scaledJobScalerErrors)
scaledJobRegistry.MustRegister(scaledJobErrors)
}

// NewServer creates a new http serving instance of prometheus metrics
func (metricsServer ScaledJobPrometheusMetricServer) NewServer(address string, pattern string) {
http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("OK"))
if err != nil {
log.Fatalf("Unable to write to serve custom metrics for scaledJob: %v", err)
}
})
log.Printf("Starting ScaledJob metrics server at %v", address)
http.Handle(pattern, promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))

// initialize the total error metric
_, errscaler := scaledJobScalerErrorsTotal.GetMetricWith(prometheus.Labels{})
if errscaler != nil {
log.Fatalf("Unable to initialize scaledJob total error metrics as : %v", errscaler)
}

log.Fatal(http.ListenAndServe(address, nil))
}

// RecordScaledJobScalerMetric create a measurement of the external metric used by the ScaledJob
func (metricsServer ScaledJobPrometheusMetricServer) RecordScaledJobScalerMetric(namespace string, scaledJob string, scaler string, scalerIndex int, metric string, value int64) {
scaledJobScalerMetricsValue.With(metricsServer.getLabels(namespace, scaledJob, scaler, scalerIndex, metric)).Set(float64(value))
}

// RecordScaledJobScalerError counts the number of errors occurred in trying get an external metric used by the ScaledJob
func (metricsServer ScaledJobPrometheusMetricServer) RecordScaledJobScalerError(namespace string, scaledJob string, scaler string, scalerIndex int, metric string, err error) {
if err != nil {
scaledJobScalerErrors.With(metricsServer.getLabels(namespace, scaledJob, scaler, scalerIndex, metric)).Inc()
// scaledJobErrors.With(prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject}).Inc()
metricsServer.RecordScalerObjectError(namespace, scaledJob, err)
scaledJobScalerErrorsTotal.With(prometheus.Labels{}).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaler := scaledJobScalerErrors.GetMetricWith(metricsServer.getLabels(namespace, scaledJob, scaler, scalerIndex, metric))
if errscaler != nil {
log.Fatalf("Unable to write to serve custom metrics for scaledJob: %v", errscaler)
}
}

// RecordScalerObjectError counts the number of errors with the scaled job
func (metricsServer ScaledJobPrometheusMetricServer) RecordScalerObjectError(namespace string, scaledJob string, err error) {
labels := prometheus.Labels{"namespace": namespace, "scaledJob": scaledJob}
if err != nil {
scaledJobErrors.With(labels).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaledjob := scaledJobErrors.GetMetricWith(labels)
if errscaledjob != nil {
log.Fatalf("Unable to write to serve custom metrics for scaledJob: %v", errscaledjob)
return
}
}

func (metricsServer ScaledJobPrometheusMetricServer) getLabels(namespace string, scaledJob string, scaler string, scalerIndex int, metric string) prometheus.Labels {
return prometheus.Labels{"namespace": namespace, "scaledJob": scaledJob, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric}
}
96 changes: 2 additions & 94 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/go-logr/logr"
"k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -37,6 +36,7 @@ import (
"github.com/kedacore/keda/v2/pkg/scalers"
"github.com/kedacore/keda/v2/pkg/scaling/executor"
"github.com/kedacore/keda/v2/pkg/scaling/resolver"
"github.com/kedacore/keda/v2/pkg/scaling/scaledjob"
)

// ScaleHandler encapsulates the logic of calling the right scalers for
Expand Down Expand Up @@ -264,99 +264,7 @@ func (h *scaleHandler) isScaledObjectActive(ctx context.Context, scalers []scale
}

func (h *scaleHandler) isScaledJobActive(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
var queueLength int64
var targetAverageValue int64
var maxValue int64
isActive := false

// TODO refactor this, do chores, reduce the verbosity ie: V(1) and frequency of logs
// move relevant funcs getTargetAverageValue(), min() and divideWithCeil() out of scaler_handler.go
for _, scaler := range scalers {
scalerLogger := h.logger.WithValues("Scaler", scaler)

metricSpecs := scaler.GetMetricSpecForScaling()

// skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata)
// or skip cpu/memory resource scaler
if len(metricSpecs) < 1 || metricSpecs[0].External == nil {
continue
}

isTriggerActive, err := scaler.IsActive(ctx)

scalerLogger.Info("Active trigger", "isTriggerActive", isTriggerActive)

targetAverageValue = getTargetAverageValue(metricSpecs)

scalerLogger.Info("Scaler targetAverageValue", "targetAverageValue", targetAverageValue)

metrics, _ := scaler.GetMetrics(ctx, "queueLength", nil)

var metricValue int64

for _, m := range metrics {
if m.MetricName == "queueLength" {
metricValue, _ = m.Value.AsInt64()
queueLength += metricValue
}
}
scalerLogger.Info("QueueLength Metric value", "queueLength", queueLength)

scaler.Close()
if err != nil {
scalerLogger.V(1).Info("Error getting scale decision, but continue", "Error", err)
h.recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
continue
} else if isTriggerActive {
isActive = true
scalerLogger.Info("Scaler is active")
}
}
if targetAverageValue != 0 {
maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue))
}
h.logger.Info("Scaler maxValue", "maxValue", maxValue)
return isActive, queueLength, maxValue
}

func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 {
var targetAverageValue int64
var metricValue int64
var flag bool
for _, metric := range metricSpecs {
if metric.External.Target.AverageValue == nil {
metricValue = 0
} else {
metricValue, flag = metric.External.Target.AverageValue.AsInt64()
if !flag {
metricValue = 0
}
}

targetAverageValue += metricValue
}
count := int64(len(metricSpecs))
if count != 0 {
return targetAverageValue / count
}
return 0
}

func divideWithCeil(x, y int64) int64 {
ans := x / y
reminder := x % y
if reminder != 0 {
return ans + 1
}
return ans
}

// Min function for int64
func min(x, y int64) int64 {
if x > y {
return y
}
return x
return scaledjob.GetScaleMetrics(ctx, scalers, scaledJob, h.recorder)
}

// buildScalers returns list of Scalers for the specified triggers
Expand Down
31 changes: 0 additions & 31 deletions pkg/scaling/scale_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,6 @@ import (
"github.com/kedacore/keda/v2/pkg/scaling/executor"
)

func TestTargetAverageValue(t *testing.T) {
// count = 0
specs := []v2beta2.MetricSpec{}
targetAverageValue := getTargetAverageValue(specs)
assert.Equal(t, int64(0), targetAverageValue)
// 1 1
specs = []v2beta2.MetricSpec{
createMetricSpec(1),
createMetricSpec(1),
}
targetAverageValue = getTargetAverageValue(specs)
assert.Equal(t, int64(1), targetAverageValue)
// 5 5 3
specs = []v2beta2.MetricSpec{
createMetricSpec(5),
createMetricSpec(5),
createMetricSpec(3),
}
targetAverageValue = getTargetAverageValue(specs)
assert.Equal(t, int64(4), targetAverageValue)

// 5 5 4
specs = []v2beta2.MetricSpec{
createMetricSpec(5),
createMetricSpec(5),
createMetricSpec(3),
}
targetAverageValue = getTargetAverageValue(specs)
assert.Equal(t, int64(4), targetAverageValue)
}

func TestCheckScaledObjectScalersWithError(t *testing.T) {
ctrl := gomock.NewController(t)
client := mock_client.NewMockClient(ctrl)
Expand Down
Loading