diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b3992f7566..e3d1bd8f5dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - Add Solace PubSub+ Event Broker Scaler ([#1945](https://github.com/kedacore/keda/pull/1945)) - Add fallback functionality ([#1872](https://github.com/kedacore/keda/issues/1872)) - Introduce Idle Replica Mode ([#1958](https://github.com/kedacore/keda/pull/1958)) +- Support pod conditions for pending job count calculation ([#1970](https://github.com/kedacore/keda/pull/1970)) ### Improvements diff --git a/api/v1alpha1/scaledjob_types.go b/api/v1alpha1/scaledjob_types.go index 3f3de8640b5..6e92f074e6f 100644 --- a/api/v1alpha1/scaledjob_types.go +++ b/api/v1alpha1/scaledjob_types.go @@ -69,6 +69,8 @@ type ScalingStrategy struct { CustomScalingQueueLengthDeduction *int32 `json:"customScalingQueueLengthDeduction,omitempty"` // +optional CustomScalingRunningJobPercentage string `json:"customScalingRunningJobPercentage,omitempty"` + // +optional + PendingPodConditions []string `json:"pendingPodConditions,omitempty"` } func init() { diff --git a/pkg/scaling/executor/scale_jobs.go b/pkg/scaling/executor/scale_jobs.go index 76b81dc9a60..885cac80a00 100644 --- a/pkg/scaling/executor/scale_jobs.go +++ b/pkg/scaling/executor/scale_jobs.go @@ -162,7 +162,7 @@ func (e *scaleExecutor) getRunningJobCount(scaledJob *kedav1alpha1.ScaledJob) in return runningJobs } -func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job) bool { +func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job, s *kedav1alpha1.ScalingStrategy) bool { opts := []client.ListOption{ client.InNamespace(j.GetNamespace()), client.MatchingLabels(map[string]string{"job-name": j.GetName()}), @@ -176,9 +176,20 @@ func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job) bool { } for _, pod := range pods.Items { - if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodRunning { - return true + if len(s.PendingPodConditions) > 0 { + for _, pendingConditionType := range s.PendingPodConditions { + for _, podCondition := range pod.Status.Conditions { + if string(podCondition.Type) == pendingConditionType && podCondition.Status == corev1.ConditionTrue { + return true + } + } + } + } else { + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodRunning { + return true + } } + } return false @@ -201,7 +212,7 @@ func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) in for _, job := range jobs.Items { job := job - if !e.isJobFinished(&job) && !e.isAnyPodRunningOrCompleted(&job) { + if !e.isJobFinished(&job) && !e.isAnyPodRunningOrCompleted(&job, &scaledJob.Spec.ScalingStrategy) { pendingJobs++ } }