Skip to content

Commit

Permalink
Provide support for ProvisioningRequest's CapacityRevoked condition (k…
Browse files Browse the repository at this point in the history
…ubernetes-sigs#2196)

* Provide support for ProvisioningRequest's CapacityRevoked condition

* Clean-up

* Improve naming

* Add an unit test case, cleanup logs, slight logic changes to provisioning controller

* Clean up, improve integration tests for provisioning request, set Evicted condition for Workload if the capacity for a ProvisioningRequest has been revoked

* Add controllers to ProvisioningRequest integration tests, check evicted metric in integration tests, do not set evicted condition in the provisioning controller

* Fix integration tests

* Delete unit test that checks if that Evicted conditions stays the same

* Discard deactivating workload in the provisioning controller

* Batch workload status updates

* Update pkg/controller/admissionchecks/provisioning/controller.go

Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com>

* Use IsActive function, clean up provisioning integration tests

---------

Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com>
  • Loading branch information
2 people authored and Fiona-Waters committed Jun 25, 2024
1 parent ab8c352 commit 91b35d2
Show file tree
Hide file tree
Showing 11 changed files with 562 additions and 238 deletions.
173 changes: 28 additions & 145 deletions pkg/controller/admissionchecks/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ import (
"errors"
"fmt"
"maps"
"regexp"
"strconv"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -42,11 +39,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/util/api"
Expand Down Expand Up @@ -111,17 +106,14 @@ func NewController(client client.Client, record record.EventRecorder) (*Controll
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
wl := &kueue.Workload{}
log := ctrl.LoggerFrom(ctx)
log.V(2).Info("Reconcile workload")

err := c.client.Get(ctx, req.NamespacedName, wl)
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

if !workload.HasQuotaReservation(wl) || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
// 1.2 workload has no reservation or is finished
log.V(5).Info("workload with no reservation, delete owned requests")
return reconcile.Result{}, c.deleteOwnedProvisionRequests(ctx, req.Namespace, req.Name)
if !workload.HasQuotaReservation(wl) || workload.IsFinished(wl) {
return reconcile.Result{}, nil
}

// get the lists of relevant checks
Expand All @@ -137,11 +129,9 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
ownedPrs := list.Items
activeOrLastPRForChecks := c.activeOrLastPRForChecks(ctx, wl, relevantChecks, ownedPrs)

if workload.IsAdmitted(wl) {
// check the state of the provision requests, eventually toggle the checks to false
// otherwise there is nothing to here
log.V(5).Info("workload admitted, sync checks")
return reconcile.Result{}, c.syncCheckStates(ctx, wl, relevantChecks, activeOrLastPRForChecks)
err = c.syncCheckStates(ctx, wl, relevantChecks, activeOrLastPRForChecks)
if err != nil {
return reconcile.Result{}, err
}

err = c.deleteUnusedProvisioningRequests(ctx, ownedPrs, activeOrLastPRForChecks)
Expand All @@ -157,10 +147,6 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
return reconcile.Result{}, err
}

err = c.syncCheckStates(ctx, wl, relevantChecks, activeOrLastPRForChecks)
if err != nil {
return reconcile.Result{}, err
}
if requeAfter != nil {
return reconcile.Result{RequeueAfter: *requeAfter}, nil
}
Expand All @@ -169,14 +155,15 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco

func (c *Controller) activeOrLastPRForChecks(ctx context.Context, wl *kueue.Workload, relevantChecks []string, ownedPrs []autoscaling.ProvisioningRequest) map[string]*autoscaling.ProvisioningRequest {
activeOrLastPRForChecks := make(map[string]*autoscaling.ProvisioningRequest)
log := ctrl.LoggerFrom(ctx)
for _, checkName := range relevantChecks {
for i := range ownedPrs {
req := &ownedPrs[i]
// PRs relevant for the admission check
if matches(req, wl.Name, checkName) {
if matchesWorkloadAndCheck(req, wl.Name, checkName) {
prc, err := c.helper.ConfigForAdmissionCheck(ctx, checkName)
if err == nil && c.reqIsNeeded(ctx, wl, prc) && provReqSyncedWithConfig(req, prc) {
if currPr, exists := activeOrLastPRForChecks[checkName]; !exists || getAttempt(ctx, currPr, wl.Name, checkName) < getAttempt(ctx, req, wl.Name, checkName) {
if currPr, exists := activeOrLastPRForChecks[checkName]; !exists || getAttempt(log, currPr, wl.Name, checkName) < getAttempt(log, req, wl.Name, checkName) {
activeOrLastPRForChecks[checkName] = req
}
}
Expand Down Expand Up @@ -204,20 +191,6 @@ func (c *Controller) deleteUnusedProvisioningRequests(ctx context.Context, owned
return nil
}

func (c *Controller) deleteOwnedProvisionRequests(ctx context.Context, namespace string, name string) error {
list := &autoscaling.ProvisioningRequestList{}
if err := c.client.List(ctx, list, client.InNamespace(namespace), client.MatchingFields{RequestsOwnedByWorkloadKey: name}); err != nil {
return client.IgnoreNotFound(err)
}

for i := range list.Items {
if err := c.client.Delete(ctx, &list.Items[i]); client.IgnoreNotFound(err) != nil {
return fmt.Errorf("delete requests for %s/%s: %w", namespace, name, err)
}
}
return nil
}

func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Workload, relevantChecks []string, activeOrLastPRForChecks map[string]*autoscaling.ProvisioningRequest) (*time.Duration, error) {
log := ctrl.LoggerFrom(ctx)
var requeAfter *time.Duration
Expand All @@ -240,8 +213,9 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
attempt := int32(1)
shouldCreatePr := false
if exists {
attempt = getAttempt(ctx, oldPr, wl.Name, checkName)
attempt = getAttempt(log, oldPr, wl.Name, checkName)
if apimeta.IsStatusConditionTrue(oldPr.Status.Conditions, autoscaling.Failed) {
attempt = getAttempt(log, oldPr, wl.Name, checkName)
if attempt <= MaxRetries {
prFailed := apimeta.FindStatusCondition(oldPr.Status.Conditions, autoscaling.Failed)
remainingTime := remainingTime(prc, attempt, prFailed.LastTransitionTime.Time)
Expand All @@ -256,7 +230,7 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
} else {
shouldCreatePr = true
}
requestName := GetProvisioningRequestName(wl.Name, checkName, attempt)
requestName := ProvisioningRequestName(wl.Name, checkName, attempt)
if shouldCreatePr {
log.V(3).Info("Creating ProvisioningRequest", "requestName", requestName, "attempt", attempt)
req := &autoscaling.ProvisioningRequest{
Expand Down Expand Up @@ -433,43 +407,6 @@ func containerUses(cont *corev1.Container, resourceSet sets.Set[corev1.ResourceN
return false
}

func parametersKueueToProvisioning(in map[string]kueue.Parameter) map[string]autoscaling.Parameter {
if in == nil {
return nil
}

out := make(map[string]autoscaling.Parameter, len(in))
for k, v := range in {
out[k] = autoscaling.Parameter(v)
}
return out
}

// provReqSyncedWithConfig checks if the provisioning request has the same provisioningClassName as the provisioning request config
// and contains all the parameters from the config
func provReqSyncedWithConfig(req *autoscaling.ProvisioningRequest, prc *kueue.ProvisioningRequestConfig) bool {
if req.Spec.ProvisioningClassName != prc.Spec.ProvisioningClassName {
return false
}
for k, vCfg := range prc.Spec.Parameters {
if vReq, found := req.Spec.Parameters[k]; !found || string(vReq) != string(vCfg) {
return false
}
}
return true
}

// passProvReqParams extracts from Workload's annotations ones that should be passed to ProvisioningRequest
func passProvReqParams(wl *kueue.Workload, req *autoscaling.ProvisioningRequest) {
if req.Spec.Parameters == nil {
req.Spec.Parameters = make(map[string]autoscaling.Parameter, 0)
}
for annotation, val := range admissioncheck.FilterProvReqAnnotations(wl.Annotations) {
paramName := strings.TrimPrefix(annotation, constants.ProvReqAnnotationPrefix)
req.Spec.Parameters[paramName] = autoscaling.Parameter(val)
}
}

func updateCheckMessage(checkState *kueue.AdmissionCheckState, message string) bool {
if message == "" || checkState.Message == message {
return false
Expand Down Expand Up @@ -510,20 +447,18 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
return nil
}

prFailed := apimeta.IsStatusConditionTrue(pr.Status.Conditions, autoscaling.Failed)
prProvisioned := apimeta.IsStatusConditionTrue(pr.Status.Conditions, autoscaling.Provisioned)
prAccepted := apimeta.IsStatusConditionTrue(pr.Status.Conditions, autoscaling.Accepted)
log.V(3).Info("Synchronizing admission check state based on provisioning request", "wl", klog.KObj(wl),
"check", check,
"prName", pr.Name,
"failed", prFailed,
"provisioned", prProvisioned,
"accepted", prAccepted)

"failed", isFailed(pr),
"provisioned", isProvisioned(pr),
"accepted", isAccepted(pr),
"bookingExpired", isBookingExpired(pr),
"capacityRevoked", isCapacityRevoked(pr))
switch {
case prFailed:
case isFailed(pr):
if checkState.State != kueue.CheckStateRejected {
if attempt := getAttempt(ctx, pr, wl.Name, check); attempt <= MaxRetries {
if attempt := getAttempt(log, pr, wl.Name, check); attempt <= MaxRetries {
// it is going to be retried
message := fmt.Sprintf("Retrying after failure: %s", apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message)
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
Expand All @@ -534,7 +469,15 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message
}
}
case prProvisioned:
case isCapacityRevoked(pr):
if workload.IsActive(wl) && !workload.IsFinished(wl) {
// We mark the admission check as rejected to trigger workload eviction.
// This is needed to prevent replacement pods being stuck in the pending phase indefinitely
// as the nodes are already deleted by Cluster Autoscaler.
updated = updateCheckState(&checkState, kueue.CheckStateRejected) || updated
updated = updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.CapacityRevoked).Message) || updated
}
case isProvisioned(pr):
if updateCheckState(&checkState, kueue.CheckStateReady) {
updated = true
// add the pod podSetUpdates
Expand All @@ -543,7 +486,7 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
// to change to the "successfully provisioned" message after provisioning
updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Provisioned).Message)
}
case prAccepted:
case isAccepted(pr):
if provisionedCond := apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Provisioned); provisionedCond != nil {
// propagate the ETA update from the provisioning request into the workload
updated = updateCheckMessage(&checkState, provisionedCond.Message) || updated
Expand Down Expand Up @@ -767,21 +710,6 @@ func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {
Complete(acReconciler)
}

func GetProvisioningRequestName(workloadName, checkName string, attempt int32) string {
fullName := fmt.Sprintf("%s-%s-%d", workloadName, checkName, int(attempt))
return limitObjectName(fullName)
}

func getProvisioningRequestNamePrefix(workloadName, checkName string) string {
fullName := fmt.Sprintf("%s-%s-", workloadName, checkName)
return limitObjectName(fullName)
}

func getProvisioningRequestPodTemplateName(prName, podsetName string) string {
fullName := fmt.Sprintf("%s-%s-%s", podTemplatesPrefix, prName, podsetName)
return limitObjectName(fullName)
}

func limitObjectName(fullName string) string {
if len(fullName) <= objNameMaxPrefixLength {
return fullName
Expand All @@ -791,48 +719,3 @@ func limitObjectName(fullName string) string {
hashBytes := hex.EncodeToString(h.Sum(nil))
return fmt.Sprintf("%s-%s", fullName[:objNameMaxPrefixLength], hashBytes[:objNameHashLength])
}

func matches(pr *autoscaling.ProvisioningRequest, workloadName, checkName string) bool {
attemptRegex := getAttemptRegex(workloadName, checkName)
matches := attemptRegex.FindStringSubmatch(pr.Name)
return len(matches) > 0
}

func getAttempt(ctx context.Context, pr *autoscaling.ProvisioningRequest, workloadName, checkName string) int32 {
logger := log.FromContext(ctx)
attemptRegex := getAttemptRegex(workloadName, checkName)
matches := attemptRegex.FindStringSubmatch(pr.Name)
if len(matches) > 0 {
number, err := strconv.Atoi(matches[1])
if err != nil {
logger.Error(err, "Parsing the attempt number from provisioning request", "requestName", pr.Name)
return 1
} else {
return int32(number)
}
} else {
logger.Info("No attempt suffix in provisioning request", "requestName", pr.Name)
return 1
}
}

func getAttemptRegex(workloadName, checkName string) *regexp.Regexp {
prefix := getProvisioningRequestNamePrefix(workloadName, checkName)
escapedPrefix := regexp.QuoteMeta(prefix)
return regexp.MustCompile("^" + escapedPrefix + "([0-9]+)$")
}

func remainingTime(prc *kueue.ProvisioningRequestConfig, failuresCount int32, lastFailureTime time.Time) time.Duration {
defaultBackoff := time.Duration(MinBackoffSeconds) * time.Second
maxBackoff := 30 * time.Minute
backoffDuration := defaultBackoff
for i := 1; i < int(failuresCount); i++ {
backoffDuration *= 2
if backoffDuration >= maxBackoff {
backoffDuration = maxBackoff
break
}
}
timeElapsedSinceLastFailure := time.Since(lastFailureTime)
return backoffDuration - timeElapsedSinceLastFailure
}
Loading

0 comments on commit 91b35d2

Please sign in to comment.