Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pod conditions for pending job count calculation #1970

Merged
merged 3 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Add Solace PubSub+ Event Broker Scaler ([#1945](https://github.com/kedacore/keda/pull/1945))
- Add fallback functionality ([#1872](https://github.com/kedacore/keda/issues/1872))
- Introduce Idle Replica Mode ([#1958](https://github.com/kedacore/keda/pull/1958))
- Support pod conditions for pending job count calculation ([#1970](https://github.com/kedacore/keda/pull/1970))
- Add new scaler for Selenium Grid ([#1971](https://github.com/kedacore/keda/pull/1971))
- Support using regex to select the queues in RabbitMQ Scaler ([#1957](https://github.com/kedacore/keda/pull/1957))

Expand Down
2 changes: 2 additions & 0 deletions api/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type ScalingStrategy struct {
CustomScalingQueueLengthDeduction *int32 `json:"customScalingQueueLengthDeduction,omitempty"`
// +optional
CustomScalingRunningJobPercentage string `json:"customScalingRunningJobPercentage,omitempty"`
// +optional
PendingPodConditions []string `json:"pendingPodConditions,omitempty"`
}

func init() {
Expand Down
40 changes: 38 additions & 2 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,33 @@ func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job) bool {
return false
}

func (e *scaleExecutor) areAllPendingPodConditionsFulfilled(j *batchv1.Job, pendingPodConditions []string) bool {
opts := []client.ListOption{
client.InNamespace(j.GetNamespace()),
client.MatchingLabels(map[string]string{"job-name": j.GetName()}),
}

pods := &corev1.PodList{}
err := e.client.List(context.TODO(), pods, opts...)
if err != nil {
return false
}

var fulfilledConditionsCount int

for _, pod := range pods.Items {
for _, pendingConditionType := range pendingPodConditions {
for _, podCondition := range pod.Status.Conditions {
if string(podCondition.Type) == pendingConditionType && podCondition.Status == corev1.ConditionTrue {
fulfilledConditionsCount++
}
}
}
}

return len(pendingPodConditions) == fulfilledConditionsCount
}

func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) int64 {
var pendingJobs int64

Expand All @@ -201,8 +228,17 @@ func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) in

for _, job := range jobs.Items {
job := job
if !e.isJobFinished(&job) && !e.isAnyPodRunningOrCompleted(&job) {
pendingJobs++

if !e.isJobFinished(&job) {
if len(scaledJob.Spec.ScalingStrategy.PendingPodConditions) > 0 {
if !e.areAllPendingPodConditionsFulfilled(&job, scaledJob.Spec.ScalingStrategy.PendingPodConditions) {
pendingJobs++
}
} else {
if !e.isAnyPodRunningOrCompleted(&job) {
pendingJobs++
}
}
}
}

Expand Down
93 changes: 93 additions & 0 deletions pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,49 @@ func TestCleanUpDefaultValue(t *testing.T) {
assert.True(t, ok)
}

func TestGetPendingJobCount(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

pendingPodConditions := []string{"Ready", "PodScheduled"}
readyCondition := getPodCondition(v1.PodReady)
podScheduledCondition := getPodCondition(v1.PodScheduled)

testPendingJobTestData := []pendingJobTestData{
{PodStatus: v1.PodStatus{Phase: v1.PodSucceeded}, PendingJobCount: 0},
{PodStatus: v1.PodStatus{Phase: v1.PodRunning}, PendingJobCount: 0},
{PodStatus: v1.PodStatus{Phase: v1.PodFailed}, PendingJobCount: 1},
{PodStatus: v1.PodStatus{Phase: v1.PodPending}, PendingJobCount: 1},
{PodStatus: v1.PodStatus{Phase: v1.PodUnknown}, PendingJobCount: 1},
{PendingPodConditions: pendingPodConditions, PodStatus: v1.PodStatus{Conditions: []v1.PodCondition{}}, PendingJobCount: 1},
{PendingPodConditions: pendingPodConditions, PodStatus: v1.PodStatus{Conditions: []v1.PodCondition{readyCondition}}, PendingJobCount: 1},
{PendingPodConditions: pendingPodConditions, PodStatus: v1.PodStatus{Conditions: []v1.PodCondition{podScheduledCondition}}, PendingJobCount: 1},
{PendingPodConditions: pendingPodConditions, PodStatus: v1.PodStatus{Conditions: []v1.PodCondition{readyCondition, podScheduledCondition}}, PendingJobCount: 0},
}

for _, testData := range testPendingJobTestData {
client := getMockClientForTestingPendingPods(t, ctrl, testData.PodStatus)
scaleExecutor := getMockScaleExecutor(client)

scaledJob := getMockScaledJobWithPendingPodConditions(testData.PendingPodConditions)
result := scaleExecutor.getPendingJobCount(scaledJob)

assert.Equal(t, testData.PendingJobCount, result)
}
}

type mockJobParameter struct {
Name string
CompletionTime string
JobConditionType batchv1.JobConditionType
}

type pendingJobTestData struct {
PendingPodConditions []string
PodStatus v1.PodStatus
PendingJobCount int64
}

func getMockScaleExecutor(client *mock_client.MockClient) *scaleExecutor {
return &scaleExecutor{
client: client,
Expand Down Expand Up @@ -263,8 +300,21 @@ func getMockScaledJobWithDefaultStrategy(name string) *kedav1alpha1.ScaledJob {
return scaledJob
}

func getMockScaledJobWithPendingPodConditions(pendingPodConditions []string) *kedav1alpha1.ScaledJob {
scaledJob := &kedav1alpha1.ScaledJob{
Spec: kedav1alpha1.ScaledJobSpec{
ScalingStrategy: kedav1alpha1.ScalingStrategy{
PendingPodConditions: pendingPodConditions,
},
},
}

return scaledJob
}

func getMockClient(t *testing.T, ctrl *gomock.Controller, jobs *[]mockJobParameter, deletedJobName *map[string]string) *mock_client.MockClient {
client := mock_client.NewMockClient(ctrl)

client.EXPECT().
List(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(_ context.Context, list runtime.Object, _ ...runtimeclient.ListOption) {
j, ok := list.(*batchv1.JobList)
Expand All @@ -291,6 +341,42 @@ func getMockClient(t *testing.T, ctrl *gomock.Controller, jobs *[]mockJobParamet
return client
}

func getMockClientForTestingPendingPods(t *testing.T, ctrl *gomock.Controller, podStatus v1.PodStatus) *mock_client.MockClient {
client := mock_client.NewMockClient(ctrl)
gomock.InOrder(
// listing jobs
client.EXPECT().
List(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(_ context.Context, list runtime.Object, _ ...runtimeclient.ListOption) {
j, ok := list.(*batchv1.JobList)

if !ok {
t.Error("Cast failed on batchv1.JobList at mocking client.List()")
}

if ok {
j.Items = append(j.Items, batchv1.Job{})
}
}).
Return(nil),
// listing pods
client.EXPECT().
List(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(_ context.Context, list runtime.Object, _ ...runtimeclient.ListOption) {
p, ok := list.(*v1.PodList)

if !ok {
t.Error("Cast failed on v1.PodList at mocking client.List()")
}

if ok {
p.Items = append(p.Items, v1.Pod{Status: podStatus})
}
}).
Return(nil),
)

return client
}

func getJob(t *testing.T, name string, completionTime string, jobConditionType batchv1.JobConditionType) *batchv1.Job {
parsedCompletionTime, err := time.Parse(time.RFC3339, completionTime)
completionTimeT := metav1.NewTime(parsedCompletionTime)
Expand All @@ -313,3 +399,10 @@ func getJob(t *testing.T, name string, completionTime string, jobConditionType b
},
}
}

func getPodCondition(podConditionType v1.PodConditionType) v1.PodCondition {
return v1.PodCondition{
Type: podConditionType,
Status: v1.ConditionTrue,
}
}