Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide support for ProvisioningRequest's BookingExpired condition #2445

Merged
merged 16 commits into from
Jul 10, 2024
Merged
33 changes: 26 additions & 7 deletions pkg/controller/admissionchecks/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
attempt := int32(1)
shouldCreatePr := false
if exists {
if isFailed(oldPr) {
if isFailed(oldPr) || (isBookingExpired(oldPr) && !workload.IsAdmitted(wl)) {
// if the workload is Admitted we don't want to retry on BookingExpired
attempt = getAttempt(log, oldPr, wl.Name, checkName)
if attempt <= c.maxRetries {
prFailed := apimeta.FindStatusCondition(oldPr.Status.Conditions, autoscaling.Failed)
remainingTime := c.remainingTime(attempt, prFailed.LastTransitionTime.Time)
remainingTime := c.remainingTimeToRetry(oldPr, attempt)
if remainingTime <= 0 {
shouldCreatePr = true
attempt += 1
Expand Down Expand Up @@ -310,17 +310,23 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
return requeAfter, nil
}

func (c *Controller) remainingTime(failuresCount int32, lastFailureTime time.Time) time.Duration {
func (c *Controller) remainingTimeToRetry(pr *autoscaling.ProvisioningRequest, failuresCount int32) time.Duration {
backoffDuration := time.Duration(c.minBackoffSeconds) * time.Second
maxBackoffDuration := time.Duration(c.maxBackoffSeconds) * time.Second
var cond *metav1.Condition
if isFailed(pr) {
cond = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed)
} else {
cond = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired)
}
for i := 1; i < int(failuresCount); i++ {
backoffDuration *= 2
if backoffDuration >= maxBackoffDuration {
backoffDuration = maxBackoffDuration
break
}
}
timeElapsedSinceLastFailure := time.Since(lastFailureTime)
timeElapsedSinceLastFailure := time.Since(cond.LastTransitionTime.Time)
return backoffDuration - timeElapsedSinceLastFailure
}

Expand Down Expand Up @@ -516,12 +522,26 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
}
case isCapacityRevoked(pr):
if workload.IsActive(wl) && !workload.IsFinished(wl) {
// We mark the admission check as rejected to trigger workload eviction.
// We mark the admission check as rejected to trigger workload deactivation.
// This is needed to prevent replacement pods being stuck in the pending phase indefinitely
// as the nodes are already deleted by Cluster Autoscaler.
updated = updateCheckState(&checkState, kueue.CheckStateRejected) || updated
updated = updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.CapacityRevoked).Message) || updated
}
case isBookingExpired(pr):
PBundyra marked this conversation as resolved.
Show resolved Hide resolved
if !workload.IsAdmitted(wl) {
attempt := getAttempt(log, pr, wl.Name, check)
if attempt <= c.maxRetries {
// it is going to be retried
message := fmt.Sprintf("Retrying after booking expired: %s", apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired).Message)
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
updated = updateCheckMessage(&checkState, message) || updated
} else {
updated = true
checkState.State = kueue.CheckStateRejected
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired).Message
}
}
case isProvisioned(pr):
if updateCheckState(&checkState, kueue.CheckStateReady) {
updated = true
Expand Down Expand Up @@ -550,7 +570,6 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
}
recorderMessages = append(recorderMessages, message)
}

workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, checkState)
}
if updated {
Expand Down
188 changes: 187 additions & 1 deletion pkg/controller/admissionchecks/provisioning/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package provisioning

import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -949,6 +950,191 @@ func TestReconcile(t *testing.T) {
Obj(),
},
},
"workload does nothing when admitted and receives the provisioning request's BookingExpired condition": {
workload: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).
Admitted(true).
Obj(),
checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()},
flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()},
configs: []kueue.ProvisioningRequestConfig{
{
ObjectMeta: metav1.ObjectMeta{
Name: "config1",
},
Spec: kueue.ProvisioningRequestConfigSpec{
ProvisioningClassName: "class1",
},
},
},
requests: []autoscaling.ProvisioningRequest{
*requestWithConditions(baseRequest,
[]metav1.Condition{
{
Type: autoscaling.Failed,
Status: metav1.ConditionFalse,
},
{
Type: autoscaling.Provisioned,
Status: metav1.ConditionTrue,
},
{
Type: autoscaling.Accepted,
Status: metav1.ConditionTrue,
},
{
Type: autoscaling.BookingExpired,
Status: metav1.ConditionTrue,
},
}),
},
wantWorkloads: map[string]*kueue.Workload{
baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).
Admitted(true).
Obj(),
},
},
"workloads retries the admission check when is not admitted and receives the provisioning request's BookingExpired condition": {
workload: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).
Admitted(false).
Obj(),
checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()},
flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()},
configs: []kueue.ProvisioningRequestConfig{
{
ObjectMeta: metav1.ObjectMeta{
Name: "config1",
},
Spec: kueue.ProvisioningRequestConfigSpec{
ProvisioningClassName: "class1",
},
},
},
maxRetries: 1,
requests: []autoscaling.ProvisioningRequest{
*requestWithConditions(baseRequest,
[]metav1.Condition{
{
Type: autoscaling.Failed,
Status: metav1.ConditionFalse,
},
{
Type: autoscaling.Provisioned,
Status: metav1.ConditionTrue,
},
{
Type: autoscaling.Accepted,
Status: metav1.ConditionTrue,
},
{
Type: autoscaling.BookingExpired,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(time.Now().Add(time.Hour * (-1))),
},
}),
},
wantWorkloads: map[string]*kueue.Workload{
baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).
AdmissionChecks(kueue.AdmissionCheckState{
Name: "check1",
State: kueue.CheckStatePending,
Message: "Retrying after booking expired: ",
}, kueue.AdmissionCheckState{
Name: "not-provisioning",
State: kueue.CheckStatePending,
}).
Admitted(false).
Obj(),
},
wantRequests: map[string]*autoscaling.ProvisioningRequest{
ProvisioningRequestName("wl", baseCheck.Name, 2): &autoscaling.ProvisioningRequest{
ObjectMeta: metav1.ObjectMeta{
Namespace: TestNamespace,
Name: "wl-check1-2",
OwnerReferences: []metav1.OwnerReference{
{
Name: "wl",
},
},
},
Spec: autoscaling.ProvisioningRequestSpec{
PodSets: []autoscaling.PodSet{
{
PodTemplateRef: autoscaling.Reference{
Name: "ppt-wl-check1-2-ps1",
},
Count: 4,
},
{
PodTemplateRef: autoscaling.Reference{
Name: "ppt-wl-check1-2-ps2",
},
Count: 3,
},
},
ProvisioningClassName: "class1",
},
},
},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "ns", Name: "wl"},
EventType: corev1.EventTypeNormal,
Reason: "ProvisioningRequestCreated",
Message: `Created ProvisioningRequest: "wl-check1-2"`,
},
},
},
"workloads reject the admission check when is not admitted and receives the provisioning request's BookingExpired condition": {
workload: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).
Admitted(false).
Obj(),
checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()},
flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()},
configs: []kueue.ProvisioningRequestConfig{
{
ObjectMeta: metav1.ObjectMeta{
Name: "config1",
},
Spec: kueue.ProvisioningRequestConfigSpec{
ProvisioningClassName: "class1",
},
},
},
maxRetries: 0,
requests: []autoscaling.ProvisioningRequest{
*requestWithConditions(baseRequest,
[]metav1.Condition{
{
Type: autoscaling.Failed,
Status: metav1.ConditionFalse,
},
{
Type: autoscaling.Provisioned,
Status: metav1.ConditionTrue,
},
{
Type: autoscaling.Accepted,
Status: metav1.ConditionTrue,
},
{
Type: autoscaling.BookingExpired,
Status: metav1.ConditionTrue,
},
}),
},
wantWorkloads: map[string]*kueue.Workload{
baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).
AdmissionChecks(kueue.AdmissionCheckState{
Name: "check1",
State: kueue.CheckStateRejected,
}, kueue.AdmissionCheckState{
Name: "not-provisioning",
State: kueue.CheckStatePending,
}).
Admitted(false).
Obj(),
},
},
}

for name, tc := range cases {
Expand Down Expand Up @@ -1003,7 +1189,7 @@ func TestReconcile(t *testing.T) {
for name, wantRequest := range tc.wantRequests {
gotRequest := &autoscaling.ProvisioningRequest{}
if err := k8sclient.Get(ctx, types.NamespacedName{Namespace: TestNamespace, Name: name}, gotRequest); err != nil {
t.Errorf("unexpected error getting request %q", name)
t.Errorf("unexpected error getting request %q: %s", name, err)
}

if diff := cmp.Diff(wantRequest, gotRequest, reqCmpOptions...); diff != "" {
Expand Down
Loading