diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b3992f7566..e3d1bd8f5dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - Add Solace PubSub+ Event Broker Scaler ([#1945](https://github.com/kedacore/keda/pull/1945)) - Add fallback functionality ([#1872](https://github.com/kedacore/keda/issues/1872)) - Introduce Idle Replica Mode ([#1958](https://github.com/kedacore/keda/pull/1958)) +- Support pod conditions for pending job count calculation ([#1970](https://github.com/kedacore/keda/pull/1970)) ### Improvements diff --git a/api/v1alpha1/scaledjob_types.go b/api/v1alpha1/scaledjob_types.go index 3f3de8640b5..6e92f074e6f 100644 --- a/api/v1alpha1/scaledjob_types.go +++ b/api/v1alpha1/scaledjob_types.go @@ -69,6 +69,8 @@ type ScalingStrategy struct { CustomScalingQueueLengthDeduction *int32 `json:"customScalingQueueLengthDeduction,omitempty"` // +optional CustomScalingRunningJobPercentage string `json:"customScalingRunningJobPercentage,omitempty"` + // +optional + PendingPodConditions []string `json:"pendingPodConditions,omitempty"` } func init() { diff --git a/pkg/scaling/executor/scale_jobs.go b/pkg/scaling/executor/scale_jobs.go index 76b81dc9a60..0854137ce11 100644 --- a/pkg/scaling/executor/scale_jobs.go +++ b/pkg/scaling/executor/scale_jobs.go @@ -184,6 +184,34 @@ func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job) bool { return false } +func (e *scaleExecutor) areAllPendingPodConditionsFulfilled(j *batchv1.Job, pendingPodConditions []string) bool { + opts := []client.ListOption{ + client.InNamespace(j.GetNamespace()), + client.MatchingLabels(map[string]string{"job-name": j.GetName()}), + } + + pods := &corev1.PodList{} + err := e.client.List(context.TODO(), pods, opts...) + + if err != nil { + return false + } + + var fulfilledConditionsCount int + + for _, pod := range pods.Items { + for _, pendingConditionType := range pendingPodConditions { + for _, podCondition := range pod.Status.Conditions { + if string(podCondition.Type) == pendingConditionType && podCondition.Status == corev1.ConditionTrue { + fulfilledConditionsCount++ + } + } + } + } + + return len(pendingPodConditions) == fulfilledConditionsCount +} + func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) int64 { var pendingJobs int64 @@ -201,8 +229,17 @@ func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) in for _, job := range jobs.Items { job := job - if !e.isJobFinished(&job) && !e.isAnyPodRunningOrCompleted(&job) { - pendingJobs++ + + if !e.isJobFinished(&job) { + if len(scaledJob.Spec.ScalingStrategy.PendingPodConditions) > 0 { + if !e.areAllPendingPodConditionsFulfilled(&job, scaledJob.Spec.ScalingStrategy.PendingPodConditions) { + pendingJobs++ + } + } else { + if !e.isAnyPodRunningOrCompleted(&job) { + pendingJobs++ + } + } } } diff --git a/pkg/scaling/executor/scale_jobs_test.go b/pkg/scaling/executor/scale_jobs_test.go index d558a09940f..84cd53b53e7 100644 --- a/pkg/scaling/executor/scale_jobs_test.go +++ b/pkg/scaling/executor/scale_jobs_test.go @@ -193,12 +193,49 @@ func TestCleanUpDefaultValue(t *testing.T) { assert.True(t, ok) } +func TestGetPendingJobCount(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + pendingPodConditions := []string{"Ready", "PodScheduled"} + readyCondition := getPodCondition(v1.PodReady) + podScheduledCondition := getPodCondition(v1.PodScheduled) + + testPendingJobTestData := []pendingJobTestData{ + {PodStatus: v1.PodStatus{Phase: v1.PodSucceeded}, PendingJobCount: 0}, + {PodStatus: v1.PodStatus{Phase: v1.PodRunning}, PendingJobCount: 0}, + {PodStatus: v1.PodStatus{Phase: v1.PodFailed}, PendingJobCount: 1}, + {PodStatus: v1.PodStatus{Phase: v1.PodPending}, PendingJobCount: 1}, + {PodStatus: v1.PodStatus{Phase: v1.PodUnknown}, PendingJobCount: 1}, + {PendingPodConditions: pendingPodConditions, PodStatus: v1.PodStatus{Conditions: []v1.PodCondition{}}, PendingJobCount: 1}, + {PendingPodConditions: pendingPodConditions, PodStatus: v1.PodStatus{Conditions: []v1.PodCondition{readyCondition}}, PendingJobCount: 1}, + {PendingPodConditions: pendingPodConditions, PodStatus: v1.PodStatus{Conditions: []v1.PodCondition{podScheduledCondition}}, PendingJobCount: 1}, + {PendingPodConditions: pendingPodConditions, PodStatus: v1.PodStatus{Conditions: []v1.PodCondition{readyCondition, podScheduledCondition}}, PendingJobCount: 0}, + } + + for _, testData := range testPendingJobTestData { + client := getMockClientForTestingPendingPods(t, ctrl, testData.PodStatus) + scaleExecutor := getMockScaleExecutor(client) + + scaledJob := getMockScaledJobWithPendingPodConditions(testData.PendingPodConditions) + result := scaleExecutor.getPendingJobCount(scaledJob) + + assert.Equal(t, testData.PendingJobCount, result) + } +} + type mockJobParameter struct { Name string CompletionTime string JobConditionType batchv1.JobConditionType } +type pendingJobTestData struct { + PendingPodConditions []string + PodStatus v1.PodStatus + PendingJobCount int64 +} + func getMockScaleExecutor(client *mock_client.MockClient) *scaleExecutor { return &scaleExecutor{ client: client, @@ -263,8 +300,21 @@ func getMockScaledJobWithDefaultStrategy(name string) *kedav1alpha1.ScaledJob { return scaledJob } +func getMockScaledJobWithPendingPodConditions(pendingPodConditions []string) *kedav1alpha1.ScaledJob { + scaledJob := &kedav1alpha1.ScaledJob{ + Spec: kedav1alpha1.ScaledJobSpec{ + ScalingStrategy: kedav1alpha1.ScalingStrategy{ + PendingPodConditions: pendingPodConditions, + }, + }, + } + + return scaledJob +} + func getMockClient(t *testing.T, ctrl *gomock.Controller, jobs *[]mockJobParameter, deletedJobName *map[string]string) *mock_client.MockClient { client := mock_client.NewMockClient(ctrl) + client.EXPECT(). List(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(_ context.Context, list runtime.Object, _ ...runtimeclient.ListOption) { j, ok := list.(*batchv1.JobList) @@ -291,6 +341,42 @@ func getMockClient(t *testing.T, ctrl *gomock.Controller, jobs *[]mockJobParamet return client } +func getMockClientForTestingPendingPods(t *testing.T, ctrl *gomock.Controller, podStatus v1.PodStatus) *mock_client.MockClient { + client := mock_client.NewMockClient(ctrl) + gomock.InOrder( + // listing jobs + client.EXPECT(). + List(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(_ context.Context, list runtime.Object, _ ...runtimeclient.ListOption) { + j, ok := list.(*batchv1.JobList) + + if !ok { + t.Error("Cast failed on batchv1.JobList at mocking client.List()") + } + + if ok { + j.Items = append(j.Items, batchv1.Job{}) + } + }). + Return(nil), + // listing pods + client.EXPECT(). + List(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(_ context.Context, list runtime.Object, _ ...runtimeclient.ListOption) { + p, ok := list.(*v1.PodList) + + if !ok { + t.Error("Cast failed on v1.PodList at mocking client.List()") + } + + if ok { + p.Items = append(p.Items, v1.Pod{Status: podStatus}) + } + }). + Return(nil), + ) + + return client +} + func getJob(t *testing.T, name string, completionTime string, jobConditionType batchv1.JobConditionType) *batchv1.Job { parsedCompletionTime, err := time.Parse(time.RFC3339, completionTime) completionTimeT := metav1.NewTime(parsedCompletionTime) @@ -313,3 +399,10 @@ func getJob(t *testing.T, name string, completionTime string, jobConditionType b }, } } + +func getPodCondition(podConditionType v1.PodConditionType) v1.PodCondition { + return v1.PodCondition{ + Type: podConditionType, + Status: v1.ConditionTrue, + } +}