Skip to content

Commit

Permalink
Support pod conditions for pending job count calculation
Browse files Browse the repository at this point in the history
Signed-off-by: Yaron Yarimi <yaron.yarimi@env0.com>
  • Loading branch information
yaronya committed Aug 2, 2021
1 parent 03d0f98 commit e33dcc5
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Add Solace PubSub+ Event Broker Scaler ([#1945](https://github.com/kedacore/keda/pull/1945))
- Add fallback functionality ([#1872](https://github.com/kedacore/keda/issues/1872))
- Introduce Idle Replica Mode ([#1958](https://github.com/kedacore/keda/pull/1958))
- Support pod conditions for pending job count calculation ([#1970](https://github.com/kedacore/keda/pull/1970))

### Improvements

Expand Down
2 changes: 2 additions & 0 deletions api/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type ScalingStrategy struct {
CustomScalingQueueLengthDeduction *int32 `json:"customScalingQueueLengthDeduction,omitempty"`
// +optional
CustomScalingRunningJobPercentage string `json:"customScalingRunningJobPercentage,omitempty"`
// +optional
PendingPodConditions []string `json:"pendingPodConditions,omitempty"`
}

func init() {
Expand Down
41 changes: 39 additions & 2 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,34 @@ func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job) bool {
return false
}

func (e *scaleExecutor) areAllPendingPodConditionsFulfilled(j *batchv1.Job, pendingPodConditions []string) bool {
opts := []client.ListOption{
client.InNamespace(j.GetNamespace()),
client.MatchingLabels(map[string]string{"job-name": j.GetName()}),
}

pods := &corev1.PodList{}
err := e.client.List(context.TODO(), pods, opts...)

if err != nil {
return false
}

var fulfilledConditionsCount int

for _, pod := range pods.Items {
for _, pendingConditionType := range pendingPodConditions {
for _, podCondition := range pod.Status.Conditions {
if string(podCondition.Type) == pendingConditionType && podCondition.Status == corev1.ConditionTrue {
fulfilledConditionsCount++
}
}
}
}

return len(pendingPodConditions) == fulfilledConditionsCount
}

func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) int64 {
var pendingJobs int64

Expand All @@ -201,8 +229,17 @@ func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) in

for _, job := range jobs.Items {
job := job
if !e.isJobFinished(&job) && !e.isAnyPodRunningOrCompleted(&job) {
pendingJobs++

if !e.isJobFinished(&job) {
if len(scaledJob.Spec.ScalingStrategy.PendingPodConditions) > 0 {
if !e.areAllPendingPodConditionsFulfilled(&job, scaledJob.Spec.ScalingStrategy.PendingPodConditions) {
pendingJobs++
}
} else {
if !e.isAnyPodRunningOrCompleted(&job) {
pendingJobs++
}
}
}
}

Expand Down
93 changes: 93 additions & 0 deletions pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,49 @@ func TestCleanUpDefaultValue(t *testing.T) {
assert.True(t, ok)
}

func TestGetPendingJobCount(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

pendingPodConditions := []string{"Ready", "PodScheduled"}
readyCondition := getPodCondition(v1.PodReady)
podScheduledCondition := getPodCondition(v1.PodScheduled)

testPendingJobTestData := []pendingJobTestData{
{PodStatus: v1.PodStatus{Phase: v1.PodSucceeded}, PendingJobCount: 0},
{PodStatus: v1.PodStatus{Phase: v1.PodRunning}, PendingJobCount: 0},
{PodStatus: v1.PodStatus{Phase: v1.PodFailed}, PendingJobCount: 1},
{PodStatus: v1.PodStatus{Phase: v1.PodPending}, PendingJobCount: 1},
{PodStatus: v1.PodStatus{Phase: v1.PodUnknown}, PendingJobCount: 1},
{PendingPodConditions: pendingPodConditions, PodStatus: v1.PodStatus{Conditions: []v1.PodCondition{}}, PendingJobCount: 1},
{PendingPodConditions: pendingPodConditions, PodStatus: v1.PodStatus{Conditions: []v1.PodCondition{readyCondition}}, PendingJobCount: 1},
{PendingPodConditions: pendingPodConditions, PodStatus: v1.PodStatus{Conditions: []v1.PodCondition{podScheduledCondition}}, PendingJobCount: 1},
{PendingPodConditions: pendingPodConditions, PodStatus: v1.PodStatus{Conditions: []v1.PodCondition{readyCondition, podScheduledCondition}}, PendingJobCount: 0},
}

for _, testData := range testPendingJobTestData {
client := getMockClientForTestingPendingPods(t, ctrl, testData.PodStatus)
scaleExecutor := getMockScaleExecutor(client)

scaledJob := getMockScaledJobWithPendingPodConditions(testData.PendingPodConditions)
result := scaleExecutor.getPendingJobCount(scaledJob)

assert.Equal(t, testData.PendingJobCount, result)
}
}

type mockJobParameter struct {
Name string
CompletionTime string
JobConditionType batchv1.JobConditionType
}

type pendingJobTestData struct {
PendingPodConditions []string
PodStatus v1.PodStatus
PendingJobCount int64
}

func getMockScaleExecutor(client *mock_client.MockClient) *scaleExecutor {
return &scaleExecutor{
client: client,
Expand Down Expand Up @@ -263,8 +300,21 @@ func getMockScaledJobWithDefaultStrategy(name string) *kedav1alpha1.ScaledJob {
return scaledJob
}

func getMockScaledJobWithPendingPodConditions(pendingPodConditions []string) *kedav1alpha1.ScaledJob {
scaledJob := &kedav1alpha1.ScaledJob{
Spec: kedav1alpha1.ScaledJobSpec{
ScalingStrategy: kedav1alpha1.ScalingStrategy{
PendingPodConditions: pendingPodConditions,
},
},
}

return scaledJob
}

func getMockClient(t *testing.T, ctrl *gomock.Controller, jobs *[]mockJobParameter, deletedJobName *map[string]string) *mock_client.MockClient {
client := mock_client.NewMockClient(ctrl)

client.EXPECT().
List(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(_ context.Context, list runtime.Object, _ ...runtimeclient.ListOption) {
j, ok := list.(*batchv1.JobList)
Expand All @@ -291,6 +341,42 @@ func getMockClient(t *testing.T, ctrl *gomock.Controller, jobs *[]mockJobParamet
return client
}

func getMockClientForTestingPendingPods(t *testing.T, ctrl *gomock.Controller, podStatus v1.PodStatus) *mock_client.MockClient {
client := mock_client.NewMockClient(ctrl)
gomock.InOrder(
// listing jobs
client.EXPECT().
List(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(_ context.Context, list runtime.Object, _ ...runtimeclient.ListOption) {
j, ok := list.(*batchv1.JobList)

if !ok {
t.Error("Cast failed on batchv1.JobList at mocking client.List()")
}

if ok {
j.Items = append(j.Items, batchv1.Job{})
}
}).
Return(nil),
// listing pods
client.EXPECT().
List(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(_ context.Context, list runtime.Object, _ ...runtimeclient.ListOption) {
p, ok := list.(*v1.PodList)

if !ok {
t.Error("Cast failed on v1.PodList at mocking client.List()")
}

if ok {
p.Items = append(p.Items, v1.Pod{Status: podStatus})
}
}).
Return(nil),
)

return client
}

func getJob(t *testing.T, name string, completionTime string, jobConditionType batchv1.JobConditionType) *batchv1.Job {
parsedCompletionTime, err := time.Parse(time.RFC3339, completionTime)
completionTimeT := metav1.NewTime(parsedCompletionTime)
Expand All @@ -313,3 +399,10 @@ func getJob(t *testing.T, name string, completionTime string, jobConditionType b
},
}
}

func getPodCondition(podConditionType v1.PodConditionType) v1.PodCondition {
return v1.PodCondition{
Type: podConditionType,
Status: v1.ConditionTrue,
}
}

0 comments on commit e33dcc5

Please sign in to comment.