diff --git a/CHANGELOG.md b/CHANGELOG.md index e2a40e7dc6f..37aad88c54d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ ### Improvements -- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) +- Fix READY and ACTIVE fields of ScaledJob to show status when we run `kubectl get sj` ([#1855](https://github.com/kedacore/keda/pull/1855)) ### Breaking Changes diff --git a/controllers/scaledjob_controller.go b/controllers/scaledjob_controller.go index f048916925d..6886f6dfde7 100644 --- a/controllers/scaledjob_controller.go +++ b/controllers/scaledjob_controller.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + kedacontrollerutil "github.com/kedacore/keda/v2/controllers/util" "github.com/kedacore/keda/v2/pkg/eventreason" corev1 "k8s.io/api/core/v1" @@ -80,47 +81,63 @@ func (r *ScaledJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, err } - var errMsg string - if scaledJob.Spec.JobTargetRef != nil { - reqLogger.Info("Detected ScaleType = Job") - conditions := scaledJob.Status.Conditions.DeepCopy() - msg, err := r.reconcileScaledJob(reqLogger, scaledJob) - if err != nil { - reqLogger.Error(err, msg) - conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg) - conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed") - r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg) - } else { - wasReady := conditions.GetReadyCondition() - if wasReady.IsFalse() || wasReady.IsUnknown() { - r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling") - } - reqLogger.V(1).Info(msg) - conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg) + // ensure Status Conditions are initialized + if !scaledJob.Status.Conditions.AreInitialized() { + conditions := kedav1alpha1.GetInitializedConditions() + if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, conditions); err != nil { + return ctrl.Result{}, err } + } + // Check jobTargetRef is specified + if scaledJob.Spec.JobTargetRef == nil { + errMsg := "scaledJob.spec.jobTargetRef is not set" + err := fmt.Errorf(errMsg) + reqLogger.Error(err, "scaledJob.spec.jobTargetRef not found") return ctrl.Result{}, err } + msg, err := r.reconcileScaledJob(reqLogger, scaledJob) + conditions := scaledJob.Status.Conditions.DeepCopy() + if err != nil { + reqLogger.Error(err, msg) + conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg) + conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed") + r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg) + } else { + wasReady := conditions.GetReadyCondition() + if wasReady.IsFalse() || wasReady.IsUnknown() { + r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling") + } + reqLogger.V(1).Info(msg) + conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg) + } - errMsg = "scaledJob.Spec.JobTargetRef is not set" - err = fmt.Errorf(errMsg) - reqLogger.Error(err, "scaledJob.Spec.JobTargetRef not found") + if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, &conditions); err != nil { + return ctrl.Result{}, err + } return ctrl.Result{}, err } -// reconcileJobType implemets reconciler logic for K8s Jobs based ScaleObject +// reconcileScaledJob implements reconciler logic for K8s Jobs based ScaledJob func (r *ScaledJobReconciler) reconcileScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) { msg, err := r.deletePreviousVersionScaleJobs(logger, scaledJob) if err != nil { return msg, err } + // Check ScaledJob is Ready or not + _, err = r.scaleHandler.GetScalers(scaledJob) + if err != nil { + logger.Error(err, "Error getting scalers") + return "Failed to ensure ScaledJob is correctly created", err + } + // scaledJob was created or modified - let's start a new ScaleLoop err = r.requestScaleLoop(logger, scaledJob) if err != nil { return "Failed to start a new scale loop with scaling logic", err } - logger.Info("Initializing Scaling logic according to ScaledObject Specification") + logger.Info("Initializing Scaling logic according to ScaledJob Specification") return "ScaledJob is defined correctly and is ready to scaling", nil } diff --git a/pkg/scaling/executor/scale_jobs.go b/pkg/scaling/executor/scale_jobs.go index 57eb5b637d1..76b81dc9a60 100644 --- a/pkg/scaling/executor/scale_jobs.go +++ b/pkg/scaling/executor/scale_jobs.go @@ -50,6 +50,21 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al logger.V(1).Info("No change in activity") } + condition := scaledJob.Status.Conditions.GetActiveCondition() + if condition.IsUnknown() || condition.IsTrue() != isActive { + if isActive { + if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionTrue, "ScalerActive", "Scaling is performed because triggers are active"); err != nil { + logger.Error(err, "Error setting active condition when triggers are active") + return + } + } else { + if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil { + logger.Error(err, "Error setting active condition when triggers are not active") + return + } + } + } + err := e.cleanUp(scaledJob) if err != nil { logger.Error(err, "Failed to cleanUp jobs") @@ -98,10 +113,10 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure } - // Set ScaledObject instance as the owner and controller + // Set ScaledJob instance as the owner and controller err := controllerutil.SetControllerReference(scaledJob, job, e.reconcilerScheme) if err != nil { - logger.Error(err, "Failed to set ScaledObject as the owner of the new Job") + logger.Error(err, "Failed to set ScaledJob as the owner of the new Job") } err = e.client.Create(context.TODO(), job)