Skip to content

Commit

Permalink
Deactivate Workload if any of its AdmissionCheck get Rejected (#2363)
Browse files Browse the repository at this point in the history
* Deactivate Workload when a AdmissionCheck gets Rejected

* Update integration tests, add an another scenario to provisioning integration tests

* Do not set Evicted condition right after deactivating a Workload and do it in a next Reconcile loop iteration

* Updated tests

* Add comment to the workload controller

* Change the order of handling Retry and Rejected checks in the workload controller

* Add test scenario, improve readability of integration tests

* Improve readability, update integration tests

* Update unit tests

* Delete redundant workload controller logic, update integration tests, update unit tests

* Rebase changes
  • Loading branch information
PBundyra committed Jun 12, 2024
1 parent 8370e98 commit 6ecaa87
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 153 deletions.
3 changes: 1 addition & 2 deletions pkg/controller/admissionchecks/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
attempt := int32(1)
shouldCreatePr := false
if exists {
attempt = getAttempt(log, oldPr, wl.Name, checkName)
if apimeta.IsStatusConditionTrue(oldPr.Status.Conditions, autoscaling.Failed) {
if isFailed(oldPr) {
attempt = getAttempt(log, oldPr, wl.Name, checkName)
if attempt <= MaxRetries {
prFailed := apimeta.FindStatusCondition(oldPr.Status.Conditions, autoscaling.Failed)
Expand Down
47 changes: 17 additions & 30 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -49,7 +48,6 @@ import (
config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
Expand Down Expand Up @@ -277,28 +275,6 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return r.reconcileNotReadyTimeout(ctx, req, &wl)
}

// At this point the workload is not Admitted, if it has rejected admission checks mark it as finished.
if rejectedChecks := workload.GetRejectedChecks(&wl); len(rejectedChecks) > 0 {
log.V(3).Info("Workload has Rejected admission checks, Finish with failure")
err := workload.UpdateStatus(ctx, r.client, &wl, kueue.WorkloadFinished,
metav1.ConditionTrue,
kueue.WorkloadFinishedReasonAdmissionChecksRejected,
fmt.Sprintf("Admission checks %v are rejected", rejectedChecks),
constants.KueueName)
if err == nil {
for _, owner := range wl.OwnerReferences {
uowner := unstructured.Unstructured{}
uowner.SetKind(owner.Kind)
uowner.SetAPIVersion(owner.APIVersion)
uowner.SetName(owner.Name)
uowner.SetNamespace(wl.Namespace)
uowner.SetUID(owner.UID)
r.recorder.Eventf(&uowner, corev1.EventTypeNormal, "WorkloadFinished", "Admission checks %v are rejected", rejectedChecks)
}
}
return ctrl.Result{}, err
}

switch {
case !lqExists:
log.V(3).Info("Workload is inadmissible because of missing LocalQueue", "localQueue", klog.KRef(wl.Namespace, wl.Spec.QueueName))
Expand Down Expand Up @@ -345,20 +321,31 @@ func isDisabledRequeuedByReason(w *kueue.Workload, reason string) bool {
return cond != nil && cond.Status == metav1.ConditionFalse && cond.Reason == reason
}

// reconcileCheckBasedEviction returns true if Workload has been deactivated or evicted
func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl *kueue.Workload) (bool, error) {
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) || !workload.HasRetryOrRejectedChecks(wl) {
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) || (!workload.HasRetryChecks(wl) && !workload.HasRejectedChecks(wl)) {
return false, nil
}
log := ctrl.LoggerFrom(ctx)
log.V(3).Info("Workload is evicted due to admission checks")
if workload.HasRejectedChecks(wl) {
wl.Spec.Active = ptr.To(false)
if err := r.client.Update(ctx, wl); err != nil {
return false, err
}
rejectedCheck := workload.RejectedChecks(wl)[0]
r.recorder.Eventf(wl, corev1.EventTypeWarning, "AdmissionCheckRejected", "Deactivating workload because AdmissionCheck for %v was Rejected: %s", rejectedCheck.Name, rejectedCheck.Message)
return true, nil
}
// at this point we know a Workload has at least one Retry AdmissionCheck
message := "At least one admission check is false"
workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByAdmissionCheck, message)
err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
if err == nil {
cqName, _ := r.queues.ClusterQueueForWorkload(wl)
workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByAdmissionCheck, message)
if err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true); err != nil {
return false, client.IgnoreNotFound(err)
}
return true, client.IgnoreNotFound(err)
cqName, _ := r.queues.ClusterQueueForWorkload(wl)
workload.ReportEvictedWorkload(r.recorder, wl, cqName, kueue.WorkloadEvictedByAdmissionCheck, message)
return true, nil
}

func (r *WorkloadReconciler) reconcileSyncAdmissionChecks(ctx context.Context, wl *kueue.Workload, cq *kueue.ClusterQueue) (bool, error) {
Expand Down
66 changes: 52 additions & 14 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,37 +454,34 @@ func TestReconcile(t *testing.T) {
DeletionTimestamp(testStartTime).
Obj(),
},
"unadmitted workload with rejected checks": {
"unadmitted workload with rejected checks gets deactivated": {
workload: utiltesting.MakeWorkload("wl", "ns").
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Active(false).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
}).
Condition(metav1.Condition{
Type: "Finished",
Status: "True",
Reason: kueue.WorkloadFinishedReasonAdmissionChecksRejected,
Message: "Admission checks [check] are rejected",
}).
Obj(),
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "ns", Name: "ownername"},
EventType: "Normal",
Reason: "WorkloadFinished",
Message: "Admission checks [check] are rejected",
Key: types.NamespacedName{Namespace: "ns", Name: "wl"},
EventType: "Warning",
Reason: "AdmissionCheckRejected",
Message: "Deactivating workload because AdmissionCheck for check was Rejected: ",
},
},
},
"admitted workload with rejected checks": {
"admitted workload with rejected checks gets deactivated": {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
Expand All @@ -497,11 +494,52 @@ func TestReconcile(t *testing.T) {
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
Active(false).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
}).
Conditions(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
Reason: "AdmittedByTest",
Message: "Admitted by ClusterQueue q1",
},
metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
Reason: "ByTest",
Message: "Admitted by ClusterQueue q1",
}).
Obj(),
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "ns", Name: "wl"},
EventType: "Warning",
Reason: "AdmissionCheckRejected",
Message: "Deactivating workload because AdmissionCheck for check was Rejected: ",
},
},
},
"admitted workload with retry checks": {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRetry,
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid").
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRetry,
}).
Condition(metav1.Condition{
Type: "Evicted",
Status: "True",
Expand All @@ -511,8 +549,8 @@ func TestReconcile(t *testing.T) {
Obj(),
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Name: "wl", Namespace: "ns"},
EventType: corev1.EventTypeNormal,
Key: types.NamespacedName{Namespace: "ns", Name: "wl"},
EventType: "Normal",
Reason: "EvictedDueToAdmissionCheck",
Message: "At least one admission check is false",
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
if s.cache.IsAssumedOrAdmittedWorkload(w) {
log.Info("Workload skipped from admission because it's already assumed or admitted", "workload", klog.KObj(w.Obj))
continue
} else if workload.HasRetryOrRejectedChecks(w.Obj) {
} else if workload.HasRetryChecks(w.Obj) || workload.HasRejectedChecks(w.Obj) {
e.inadmissibleMsg = "The workload has failed admission checks"
} else if snap.InactiveClusterQueueSets.Has(w.ClusterQueue) {
e.inadmissibleMsg = fmt.Sprintf("ClusterQueue %s is inactive", w.ClusterQueue)
Expand Down
14 changes: 14 additions & 0 deletions pkg/util/testing/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,17 @@ func CheckLatestEvent(ctx context.Context, k8sClient client.Client,

return false, fmt.Errorf("mismatch with the latest event: got r:%v t:%v n:%v, reg %v", item.Reason, item.Type, item.Note, item.Regarding)
}

// HasEventAppeared returns if an event has been emitted
func HasEventAppeared(ctx context.Context, k8sClient client.Client, event corev1.Event) (bool, error) {
events := &corev1.EventList{}
if err := k8sClient.List(ctx, events, &client.ListOptions{}); err != nil {
return false, err
}
for _, item := range events.Items {
if item.Reason == event.Reason && item.Type == event.Type && item.Message == event.Message {
return true, nil
}
}
return false, nil
}
25 changes: 18 additions & 7 deletions pkg/workload/admissionchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func SetAdmissionCheckState(checks *[]kueue.AdmissionCheckState, newCheck kueue.
existingCondition.PodSetUpdates = newCheck.PodSetUpdates
}

// GetRejectedChecks returns the list of Rejected admission checks
func GetRejectedChecks(wl *kueue.Workload) []string {
rejectedChecks := make([]string, 0, len(wl.Status.AdmissionChecks))
// RejectedChecks returns the list of Rejected admission checks
func RejectedChecks(wl *kueue.Workload) []kueue.AdmissionCheckState {
rejectedChecks := make([]kueue.AdmissionCheckState, 0, len(wl.Status.AdmissionChecks))
for i := range wl.Status.AdmissionChecks {
ac := wl.Status.AdmissionChecks[i]
if ac.State == kueue.CheckStateRejected {
rejectedChecks = append(rejectedChecks, ac.Name)
rejectedChecks = append(rejectedChecks, ac)
}
}
return rejectedChecks
Expand Down Expand Up @@ -138,11 +138,22 @@ func HasAllChecks(wl *kueue.Workload, mustHaveChecks sets.Set[string]) bool {
return mustHaveChecks.Len() == 0
}

// HasRetryOrRejectedChecks returns true if any of the workloads checks are Retry or Rejected
func HasRetryOrRejectedChecks(wl *kueue.Workload) bool {
// HasRetryChecks returns true if any of the workloads checks is Retry
func HasRetryChecks(wl *kueue.Workload) bool {
for i := range wl.Status.AdmissionChecks {
state := wl.Status.AdmissionChecks[i].State
if state == kueue.CheckStateRetry || state == kueue.CheckStateRejected {
if state == kueue.CheckStateRetry {
return true
}
}
return false
}

// HasRejectedChecks returns true if any of the workloads checks is Rejected
func HasRejectedChecks(wl *kueue.Workload) bool {
for i := range wl.Status.AdmissionChecks {
state := wl.Status.AdmissionChecks[i].State
if state == kueue.CheckStateRejected {
return true
}
}
Expand Down
Loading

0 comments on commit 6ecaa87

Please sign in to comment.