Skip to content

Commit

Permalink
Provide support for ProvisioningRequest's BookingExpired condition (#…
Browse files Browse the repository at this point in the history
…2445)

* Add support for BookingExpired

* Add integration tests for BookingExpired

* Improve naming

* Fix tests flakiness

* Update pkg/controller/admissionchecks/provisioning/provisioning.go

Co-authored-by: Yaroslava Serdiuk <yaroslava@google.com>

* Import metav1

* Minor refactor

* Rebase changes

* Fix setting maxRetries parameter after rebase

* Use BeComparableTo(), add additonal descriptions

* Improve integrations tests

* Use BeComparableTo with cmp.Options

* Improve integration tests

* Rebase changes

* Update pkg/controller/admissionchecks/provisioning/controller.go

Co-authored-by: Yuki Iwai <yuki.iwai.tz@gmail.com>

* Refactor to avoid nested Eventually()

---------

Co-authored-by: Yaroslava Serdiuk <yaroslava@google.com>
Co-authored-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
3 people committed Jul 10, 2024
1 parent 9f36a46 commit a3688c4
Show file tree
Hide file tree
Showing 3 changed files with 537 additions and 47 deletions.
33 changes: 26 additions & 7 deletions pkg/controller/admissionchecks/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
attempt := int32(1)
shouldCreatePr := false
if exists {
if isFailed(oldPr) {
if isFailed(oldPr) || (isBookingExpired(oldPr) && !workload.IsAdmitted(wl)) {
// if the workload is Admitted we don't want to retry on BookingExpired
attempt = getAttempt(log, oldPr, wl.Name, checkName)
if attempt <= c.maxRetries {
prFailed := apimeta.FindStatusCondition(oldPr.Status.Conditions, autoscaling.Failed)
remainingTime := c.remainingTime(attempt, prFailed.LastTransitionTime.Time)
remainingTime := c.remainingTimeToRetry(oldPr, attempt)
if remainingTime <= 0 {
shouldCreatePr = true
attempt += 1
Expand Down Expand Up @@ -310,17 +310,23 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
return requeAfter, nil
}

func (c *Controller) remainingTime(failuresCount int32, lastFailureTime time.Time) time.Duration {
func (c *Controller) remainingTimeToRetry(pr *autoscaling.ProvisioningRequest, failuresCount int32) time.Duration {
backoffDuration := time.Duration(c.minBackoffSeconds) * time.Second
maxBackoffDuration := time.Duration(c.maxBackoffSeconds) * time.Second
var cond *metav1.Condition
if isFailed(pr) {
cond = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed)
} else {
cond = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired)
}
for i := 1; i < int(failuresCount); i++ {
backoffDuration *= 2
if backoffDuration >= maxBackoffDuration {
backoffDuration = maxBackoffDuration
break
}
}
timeElapsedSinceLastFailure := time.Since(lastFailureTime)
timeElapsedSinceLastFailure := time.Since(cond.LastTransitionTime.Time)
return backoffDuration - timeElapsedSinceLastFailure
}

Expand Down Expand Up @@ -516,12 +522,26 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
}
case isCapacityRevoked(pr):
if workload.IsActive(wl) && !workload.IsFinished(wl) {
// We mark the admission check as rejected to trigger workload eviction.
// We mark the admission check as rejected to trigger workload deactivation.
// This is needed to prevent replacement pods being stuck in the pending phase indefinitely
// as the nodes are already deleted by Cluster Autoscaler.
updated = updateCheckState(&checkState, kueue.CheckStateRejected) || updated
updated = updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.CapacityRevoked).Message) || updated
}
case isBookingExpired(pr):
if !workload.IsAdmitted(wl) {
attempt := getAttempt(log, pr, wl.Name, check)
if attempt <= c.maxRetries {
// it is going to be retried
message := fmt.Sprintf("Retrying after booking expired: %s", apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired).Message)
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
updated = updateCheckMessage(&checkState, message) || updated
} else {
updated = true
checkState.State = kueue.CheckStateRejected
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired).Message
}
}
case isProvisioned(pr):
if updateCheckState(&checkState, kueue.CheckStateReady) {
updated = true
Expand Down Expand Up @@ -550,7 +570,6 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
}
recorderMessages = append(recorderMessages, message)
}

workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, checkState)
}
if updated {
Expand Down
188 changes: 187 additions & 1 deletion pkg/controller/admissionchecks/provisioning/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package provisioning

import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -949,6 +950,191 @@ func TestReconcile(t *testing.T) {
Obj(),
},
},
"workload does nothing when admitted and receives the provisioning request's BookingExpired condition": {
workload: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).
Admitted(true).
Obj(),
checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()},
flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()},
configs: []kueue.ProvisioningRequestConfig{
{
ObjectMeta: metav1.ObjectMeta{
Name: "config1",
},
Spec: kueue.ProvisioningRequestConfigSpec{
ProvisioningClassName: "class1",
},
},
},
requests: []autoscaling.ProvisioningRequest{
*requestWithConditions(baseRequest,
[]metav1.Condition{
{
Type: autoscaling.Failed,
Status: metav1.ConditionFalse,
},
{
Type: autoscaling.Provisioned,
Status: metav1.ConditionTrue,
},
{
Type: autoscaling.Accepted,
Status: metav1.ConditionTrue,
},
{
Type: autoscaling.BookingExpired,
Status: metav1.ConditionTrue,
},
}),
},
wantWorkloads: map[string]*kueue.Workload{
baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).
Admitted(true).
Obj(),
},
},
"workloads retries the admission check when is not admitted and receives the provisioning request's BookingExpired condition": {
workload: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).
Admitted(false).
Obj(),
checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()},
flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()},
configs: []kueue.ProvisioningRequestConfig{
{
ObjectMeta: metav1.ObjectMeta{
Name: "config1",
},
Spec: kueue.ProvisioningRequestConfigSpec{
ProvisioningClassName: "class1",
},
},
},
maxRetries: 1,
requests: []autoscaling.ProvisioningRequest{
*requestWithConditions(baseRequest,
[]metav1.Condition{
{
Type: autoscaling.Failed,
Status: metav1.ConditionFalse,
},
{
Type: autoscaling.Provisioned,
Status: metav1.ConditionTrue,
},
{
Type: autoscaling.Accepted,
Status: metav1.ConditionTrue,
},
{
Type: autoscaling.BookingExpired,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(time.Now().Add(time.Hour * (-1))),
},
}),
},
wantWorkloads: map[string]*kueue.Workload{
baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).
AdmissionChecks(kueue.AdmissionCheckState{
Name: "check1",
State: kueue.CheckStatePending,
Message: "Retrying after booking expired: ",
}, kueue.AdmissionCheckState{
Name: "not-provisioning",
State: kueue.CheckStatePending,
}).
Admitted(false).
Obj(),
},
wantRequests: map[string]*autoscaling.ProvisioningRequest{
ProvisioningRequestName("wl", baseCheck.Name, 2): &autoscaling.ProvisioningRequest{
ObjectMeta: metav1.ObjectMeta{
Namespace: TestNamespace,
Name: "wl-check1-2",
OwnerReferences: []metav1.OwnerReference{
{
Name: "wl",
},
},
},
Spec: autoscaling.ProvisioningRequestSpec{
PodSets: []autoscaling.PodSet{
{
PodTemplateRef: autoscaling.Reference{
Name: "ppt-wl-check1-2-ps1",
},
Count: 4,
},
{
PodTemplateRef: autoscaling.Reference{
Name: "ppt-wl-check1-2-ps2",
},
Count: 3,
},
},
ProvisioningClassName: "class1",
},
},
},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "ns", Name: "wl"},
EventType: corev1.EventTypeNormal,
Reason: "ProvisioningRequestCreated",
Message: `Created ProvisioningRequest: "wl-check1-2"`,
},
},
},
"workloads reject the admission check when is not admitted and receives the provisioning request's BookingExpired condition": {
workload: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).
Admitted(false).
Obj(),
checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()},
flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()},
configs: []kueue.ProvisioningRequestConfig{
{
ObjectMeta: metav1.ObjectMeta{
Name: "config1",
},
Spec: kueue.ProvisioningRequestConfigSpec{
ProvisioningClassName: "class1",
},
},
},
maxRetries: 0,
requests: []autoscaling.ProvisioningRequest{
*requestWithConditions(baseRequest,
[]metav1.Condition{
{
Type: autoscaling.Failed,
Status: metav1.ConditionFalse,
},
{
Type: autoscaling.Provisioned,
Status: metav1.ConditionTrue,
},
{
Type: autoscaling.Accepted,
Status: metav1.ConditionTrue,
},
{
Type: autoscaling.BookingExpired,
Status: metav1.ConditionTrue,
},
}),
},
wantWorkloads: map[string]*kueue.Workload{
baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).
AdmissionChecks(kueue.AdmissionCheckState{
Name: "check1",
State: kueue.CheckStateRejected,
}, kueue.AdmissionCheckState{
Name: "not-provisioning",
State: kueue.CheckStatePending,
}).
Admitted(false).
Obj(),
},
},
}

for name, tc := range cases {
Expand Down Expand Up @@ -1003,7 +1189,7 @@ func TestReconcile(t *testing.T) {
for name, wantRequest := range tc.wantRequests {
gotRequest := &autoscaling.ProvisioningRequest{}
if err := k8sclient.Get(ctx, types.NamespacedName{Namespace: TestNamespace, Name: name}, gotRequest); err != nil {
t.Errorf("unexpected error getting request %q", name)
t.Errorf("unexpected error getting request %q: %s", name, err)
}

if diff := cmp.Diff(wantRequest, gotRequest, reqCmpOptions...); diff != "" {
Expand Down
Loading

0 comments on commit a3688c4

Please sign in to comment.