Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ScaledJob: introduce rolloutStrategy #2164

Merged
merged 14 commits into from
Nov 1, 2021
2 changes: 2 additions & 0 deletions apis/keda/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type ScaledJobSpec struct {
// +optional
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"`
// +optional
RolloutStrategy string `json:"rolloutStrategy,omitempty"`
// +optional
EnvSourceContainerName string `json:"envSourceContainerName,omitempty"`
// +optional
MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"`
Expand Down
45 changes: 25 additions & 20 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,30 +155,35 @@ func (r *ScaledJobReconciler) reconcileScaledJob(logger logr.Logger, scaledJob *
return "ScaledJob is defined correctly and is ready to scaling", nil
}

// Delete Jobs owned by the previous version of the scaledJob
// Delete Jobs owned by the previous version of the scaledJob based on the rolloutStartegy
etamarw marked this conversation as resolved.
Show resolved Hide resolved
func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
opts := []client.ListOption{
client.InNamespace(scaledJob.GetNamespace()),
client.MatchingLabels(map[string]string{"scaledjob.keda.sh/name": scaledJob.GetName()}),
}
jobs := &batchv1.JobList{}
err := r.Client.List(context.TODO(), jobs, opts...)
if err != nil {
return "Cannot get list of Jobs owned by this scaledJob", err
}

if jobs.Size() > 0 {
logger.Info("Deleting jobs owned by the previous version of the scaledJob", "Number of jobs to delete", jobs.Size())
}
for _, job := range jobs.Items {
job := job
err = r.Client.Delete(context.TODO(), &job, client.PropagationPolicy(metav1.DeletePropagationBackground))
switch scaledJob.Spec.RolloutStrategy {
default:
opts := []client.ListOption {
client.InNamespace(scaledJob.GetNamespace()),
client.MatchingLabels(map[string]string{"scaledjob.keda.sh/name": scaledJob.GetName()}),
}
jobs := &batchv1.JobList{}
err := r.Client.List(context.TODO(), jobs, opts...)
etamarw marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return "Not able to delete job: " + job.Name, err
return "Cannot get list of Jobs owned by this scaledJob", err
}
}

return fmt.Sprintf("Deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", jobs.Size()), nil
if len(jobs.Items) > 0 {
logger.Info("RolloutStrategy: immediate, Deleting jobs owned by the previous version of the scaledJob", "Number of jobs to delete", len(jobs.Items))
etamarw marked this conversation as resolved.
Show resolved Hide resolved
}
for _, job := range jobs.Items {
job := job
err = r.Client.Delete(context.TODO(), &job, client.PropagationPolicy(metav1.DeletePropagationBackground))
etamarw marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return "Not able to delete job: " + job.Name, err
}
}
return fmt.Sprintf("RolloutStrategy: immediate, deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", len(jobs.Items)), nil
case "gradual":
etamarw marked this conversation as resolved.
Show resolved Hide resolved
logger.Info("RolloutStrategy: gradual, Not deleteing jobs owned by the previous version of the scaleJob")
}
return fmt.Sprintf("RolloutStrategy: %s", scaledJob.Spec.RolloutStrategy), nil
}

// requestScaleLoop request ScaleLoop handler for the respective ScaledJob
Expand Down