Skip to content

Commit

Permalink
ScaledJob: introduce rolloutStrategy (#2164)
Browse files Browse the repository at this point in the history
Signed-off-by: etamarw <etamarw@wix.com>
Co-authored-by: Aaron Schlesinger <70865+arschles@users.noreply.github.com>
Co-authored-by: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com>
  • Loading branch information
3 people committed Nov 1, 2021
1 parent 3afb888 commit 197594d
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
### New

- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- ScaledJob: introduce `RolloutStrategy` ([#2164](https://github.com/kedacore/keda/pull/2164))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))
- Add Cassandra Scaler ([#2211](https://github.com/kedacore/keda/pull/2211))
- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181))
Expand Down
1 change: 0 additions & 1 deletion adapter/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions apis/keda/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type ScaledJobSpec struct {
// +optional
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"`
// +optional
RolloutStrategy string `json:"rolloutStrategy,omitempty"`
// +optional
EnvSourceContainerName string `json:"envSourceContainerName,omitempty"`
// +optional
MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"`
Expand Down
1 change: 0 additions & 1 deletion apis/keda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
controller-gen.kubebuilder.io/version: v0.6.1
creationTimestamp: null
name: scaledjobs.keda.sh
spec:
Expand Down Expand Up @@ -7354,6 +7354,8 @@ spec:
pollingInterval:
format: int32
type: integer
rolloutStrategy:
type: string
scalingStrategy:
description: ScalingStrategy defines the strategy of Scaling
properties:
Expand Down
4 changes: 3 additions & 1 deletion config/crd/bases/keda.sh_triggerauthentications.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
controller-gen.kubebuilder.io/version: v0.6.1
creationTimestamp: null
name: triggerauthentications.keda.sh
spec:
Expand Down Expand Up @@ -89,6 +89,8 @@ spec:
type: object
mount:
type: string
namespace:
type: string
role:
type: string
secrets:
Expand Down
49 changes: 27 additions & 22 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// reconcileScaledJob implements reconciler logic for K8s Jobs based ScaledJob
func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
msg, err := r.deletePreviousVersionScaleJobs(logger, scaledJob)
msg, err := r.deletePreviousVersionScaleJobs(ctx, logger, scaledJob)
if err != nil {
return msg, err
}
Expand All @@ -155,30 +155,35 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log
return "ScaledJob is defined correctly and is ready to scaling", nil
}

// Delete Jobs owned by the previous version of the scaledJob
func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
opts := []client.ListOption{
client.InNamespace(scaledJob.GetNamespace()),
client.MatchingLabels(map[string]string{"scaledjob.keda.sh/name": scaledJob.GetName()}),
}
jobs := &batchv1.JobList{}
err := r.Client.List(context.TODO(), jobs, opts...)
if err != nil {
return "Cannot get list of Jobs owned by this scaledJob", err
}

if jobs.Size() > 0 {
logger.Info("Deleting jobs owned by the previous version of the scaledJob", "Number of jobs to delete", jobs.Size())
}
for _, job := range jobs.Items {
job := job
err = r.Client.Delete(context.TODO(), &job, client.PropagationPolicy(metav1.DeletePropagationBackground))
// Delete Jobs owned by the previous version of the scaledJob based on the rolloutStrategy given for this scaledJob, if any
func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
switch scaledJob.Spec.RolloutStrategy {
case "gradual":
logger.Info("RolloutStrategy: gradual, Not deleting jobs owned by the previous version of the scaleJob")
default:
opts := []client.ListOption{
client.InNamespace(scaledJob.GetNamespace()),
client.MatchingLabels(map[string]string{"scaledjob.keda.sh/name": scaledJob.GetName()}),
}
jobs := &batchv1.JobList{}
err := r.Client.List(ctx, jobs, opts...)
if err != nil {
return "Not able to delete job: " + job.Name, err
return "Cannot get list of Jobs owned by this scaledJob", err
}
}

return fmt.Sprintf("Deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", jobs.Size()), nil
if len(jobs.Items) > 0 {
logger.Info("RolloutStrategy: immediate, Deleting jobs owned by the previous version of the scaledJob", "numJobsToDelete", len(jobs.Items))
}
for _, job := range jobs.Items {
job := job
err = r.Client.Delete(ctx, &job, client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
return "Not able to delete job: " + job.Name, err
}
}
return fmt.Sprintf("RolloutStrategy: immediate, deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", len(jobs.Items)), nil
}
return fmt.Sprintf("RolloutStrategy: %s", scaledJob.Spec.RolloutStrategy), nil
}

// requestScaleLoop request ScaleLoop handler for the respective ScaledJob
Expand Down

0 comments on commit 197594d

Please sign in to comment.