Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide support for ProvisioningRequest's CapacityRevoked condition #2196

Merged
merged 12 commits into from
Jun 7, 2024
Merged
173 changes: 28 additions & 145 deletions pkg/controller/admissionchecks/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ import (
"errors"
"fmt"
"maps"
"regexp"
"strconv"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -42,11 +39,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/util/api"
Expand Down Expand Up @@ -111,17 +106,14 @@ func NewController(client client.Client, record record.EventRecorder) (*Controll
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
wl := &kueue.Workload{}
log := ctrl.LoggerFrom(ctx)
log.V(2).Info("Reconcile workload")

err := c.client.Get(ctx, req.NamespacedName, wl)
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

if !workload.HasQuotaReservation(wl) || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
// 1.2 workload has no reservation or is finished
log.V(5).Info("workload with no reservation, delete owned requests")
return reconcile.Result{}, c.deleteOwnedProvisionRequests(ctx, req.Namespace, req.Name)
if !workload.HasQuotaReservation(wl) || workload.IsFinished(wl) {
PBundyra marked this conversation as resolved.
Show resolved Hide resolved
return reconcile.Result{}, nil
}

// get the lists of relevant checks
Expand All @@ -137,11 +129,9 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
ownedPrs := list.Items
activeOrLastPRForChecks := c.activeOrLastPRForChecks(ctx, wl, relevantChecks, ownedPrs)

if workload.IsAdmitted(wl) {
// check the state of the provision requests, eventually toggle the checks to false
// otherwise there is nothing to here
log.V(5).Info("workload admitted, sync checks")
return reconcile.Result{}, c.syncCheckStates(ctx, wl, relevantChecks, activeOrLastPRForChecks)
err = c.syncCheckStates(ctx, wl, relevantChecks, activeOrLastPRForChecks)
if err != nil {
return reconcile.Result{}, err
}

err = c.deleteUnusedProvisioningRequests(ctx, ownedPrs, activeOrLastPRForChecks)
PBundyra marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -157,10 +147,6 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
return reconcile.Result{}, err
}

err = c.syncCheckStates(ctx, wl, relevantChecks, activeOrLastPRForChecks)
if err != nil {
return reconcile.Result{}, err
}
if requeAfter != nil {
return reconcile.Result{RequeueAfter: *requeAfter}, nil
}
Expand All @@ -169,14 +155,15 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco

func (c *Controller) activeOrLastPRForChecks(ctx context.Context, wl *kueue.Workload, relevantChecks []string, ownedPrs []autoscaling.ProvisioningRequest) map[string]*autoscaling.ProvisioningRequest {
activeOrLastPRForChecks := make(map[string]*autoscaling.ProvisioningRequest)
log := ctrl.LoggerFrom(ctx)
for _, checkName := range relevantChecks {
for i := range ownedPrs {
req := &ownedPrs[i]
// PRs relevant for the admission check
if matches(req, wl.Name, checkName) {
if matchesWorkloadAndCheck(req, wl.Name, checkName) {
prc, err := c.helper.ConfigForAdmissionCheck(ctx, checkName)
if err == nil && c.reqIsNeeded(ctx, wl, prc) && provReqSyncedWithConfig(req, prc) {
if currPr, exists := activeOrLastPRForChecks[checkName]; !exists || getAttempt(ctx, currPr, wl.Name, checkName) < getAttempt(ctx, req, wl.Name, checkName) {
if currPr, exists := activeOrLastPRForChecks[checkName]; !exists || getAttempt(log, currPr, wl.Name, checkName) < getAttempt(log, req, wl.Name, checkName) {
activeOrLastPRForChecks[checkName] = req
}
}
Expand Down Expand Up @@ -204,20 +191,6 @@ func (c *Controller) deleteUnusedProvisioningRequests(ctx context.Context, owned
return nil
}

func (c *Controller) deleteOwnedProvisionRequests(ctx context.Context, namespace string, name string) error {
list := &autoscaling.ProvisioningRequestList{}
if err := c.client.List(ctx, list, client.InNamespace(namespace), client.MatchingFields{RequestsOwnedByWorkloadKey: name}); err != nil {
return client.IgnoreNotFound(err)
}

for i := range list.Items {
if err := c.client.Delete(ctx, &list.Items[i]); client.IgnoreNotFound(err) != nil {
return fmt.Errorf("delete requests for %s/%s: %w", namespace, name, err)
}
}
return nil
}

func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Workload, relevantChecks []string, activeOrLastPRForChecks map[string]*autoscaling.ProvisioningRequest) (*time.Duration, error) {
log := ctrl.LoggerFrom(ctx)
var requeAfter *time.Duration
Expand All @@ -240,8 +213,9 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
attempt := int32(1)
shouldCreatePr := false
if exists {
attempt = getAttempt(ctx, oldPr, wl.Name, checkName)
attempt = getAttempt(log, oldPr, wl.Name, checkName)
if apimeta.IsStatusConditionTrue(oldPr.Status.Conditions, autoscaling.Failed) {
attempt = getAttempt(log, oldPr, wl.Name, checkName)
if attempt <= MaxRetries {
prFailed := apimeta.FindStatusCondition(oldPr.Status.Conditions, autoscaling.Failed)
remainingTime := remainingTime(prc, attempt, prFailed.LastTransitionTime.Time)
Expand All @@ -256,7 +230,7 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
} else {
shouldCreatePr = true
}
requestName := GetProvisioningRequestName(wl.Name, checkName, attempt)
requestName := ProvisioningRequestName(wl.Name, checkName, attempt)
if shouldCreatePr {
log.V(3).Info("Creating ProvisioningRequest", "requestName", requestName, "attempt", attempt)
req := &autoscaling.ProvisioningRequest{
Expand Down Expand Up @@ -433,43 +407,6 @@ func containerUses(cont *corev1.Container, resourceSet sets.Set[corev1.ResourceN
return false
}

func parametersKueueToProvisioning(in map[string]kueue.Parameter) map[string]autoscaling.Parameter {
if in == nil {
return nil
}

out := make(map[string]autoscaling.Parameter, len(in))
for k, v := range in {
out[k] = autoscaling.Parameter(v)
}
return out
}

// provReqSyncedWithConfig checks if the provisioning request has the same provisioningClassName as the provisioning request config
// and contains all the parameters from the config
func provReqSyncedWithConfig(req *autoscaling.ProvisioningRequest, prc *kueue.ProvisioningRequestConfig) bool {
if req.Spec.ProvisioningClassName != prc.Spec.ProvisioningClassName {
return false
}
for k, vCfg := range prc.Spec.Parameters {
if vReq, found := req.Spec.Parameters[k]; !found || string(vReq) != string(vCfg) {
return false
}
}
return true
}

// passProvReqParams extracts from Workload's annotations ones that should be passed to ProvisioningRequest
func passProvReqParams(wl *kueue.Workload, req *autoscaling.ProvisioningRequest) {
if req.Spec.Parameters == nil {
req.Spec.Parameters = make(map[string]autoscaling.Parameter, 0)
}
for annotation, val := range admissioncheck.FilterProvReqAnnotations(wl.Annotations) {
paramName := strings.TrimPrefix(annotation, constants.ProvReqAnnotationPrefix)
req.Spec.Parameters[paramName] = autoscaling.Parameter(val)
}
}

func updateCheckMessage(checkState *kueue.AdmissionCheckState, message string) bool {
if message == "" || checkState.Message == message {
return false
Expand Down Expand Up @@ -510,20 +447,18 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
return nil
}

prFailed := apimeta.IsStatusConditionTrue(pr.Status.Conditions, autoscaling.Failed)
prProvisioned := apimeta.IsStatusConditionTrue(pr.Status.Conditions, autoscaling.Provisioned)
prAccepted := apimeta.IsStatusConditionTrue(pr.Status.Conditions, autoscaling.Accepted)
log.V(3).Info("Synchronizing admission check state based on provisioning request", "wl", klog.KObj(wl),
"check", check,
"prName", pr.Name,
"failed", prFailed,
"provisioned", prProvisioned,
"accepted", prAccepted)

"failed", isFailed(pr),
"provisioned", isProvisioned(pr),
"accepted", isAccepted(pr),
"bookingExpired", isBookingExpired(pr),
"capacityRevoked", isCapacityRevoked(pr))
switch {
case prFailed:
case isFailed(pr):
if checkState.State != kueue.CheckStateRejected {
if attempt := getAttempt(ctx, pr, wl.Name, check); attempt <= MaxRetries {
if attempt := getAttempt(log, pr, wl.Name, check); attempt <= MaxRetries {
// it is going to be retried
message := fmt.Sprintf("Retrying after failure: %s", apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message)
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
Expand All @@ -534,7 +469,15 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message
}
}
case prProvisioned:
case isCapacityRevoked(pr):
if workload.IsActive(wl) && !workload.IsFinished(wl) {
// We mark the admission check as rejected to trigger workload eviction.
// This is needed to prevent replacement pods being stuck in the pending phase indefinitely
// as the nodes are already deleted by Cluster Autoscaler.
updated = updateCheckState(&checkState, kueue.CheckStateRejected) || updated
updated = updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.CapacityRevoked).Message) || updated
}
case isProvisioned(pr):
if updateCheckState(&checkState, kueue.CheckStateReady) {
updated = true
// add the pod podSetUpdates
Expand All @@ -543,7 +486,7 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
// to change to the "successfully provisioned" message after provisioning
updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Provisioned).Message)
}
case prAccepted:
case isAccepted(pr):
if provisionedCond := apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Provisioned); provisionedCond != nil {
// propagate the ETA update from the provisioning request into the workload
updated = updateCheckMessage(&checkState, provisionedCond.Message) || updated
Expand Down Expand Up @@ -767,21 +710,6 @@ func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {
Complete(acReconciler)
}

func GetProvisioningRequestName(workloadName, checkName string, attempt int32) string {
fullName := fmt.Sprintf("%s-%s-%d", workloadName, checkName, int(attempt))
return limitObjectName(fullName)
}

func getProvisioningRequestNamePrefix(workloadName, checkName string) string {
fullName := fmt.Sprintf("%s-%s-", workloadName, checkName)
return limitObjectName(fullName)
}

func getProvisioningRequestPodTemplateName(prName, podsetName string) string {
fullName := fmt.Sprintf("%s-%s-%s", podTemplatesPrefix, prName, podsetName)
return limitObjectName(fullName)
}

func limitObjectName(fullName string) string {
if len(fullName) <= objNameMaxPrefixLength {
return fullName
Expand All @@ -791,48 +719,3 @@ func limitObjectName(fullName string) string {
hashBytes := hex.EncodeToString(h.Sum(nil))
return fmt.Sprintf("%s-%s", fullName[:objNameMaxPrefixLength], hashBytes[:objNameHashLength])
}

func matches(pr *autoscaling.ProvisioningRequest, workloadName, checkName string) bool {
attemptRegex := getAttemptRegex(workloadName, checkName)
matches := attemptRegex.FindStringSubmatch(pr.Name)
return len(matches) > 0
}

func getAttempt(ctx context.Context, pr *autoscaling.ProvisioningRequest, workloadName, checkName string) int32 {
logger := log.FromContext(ctx)
attemptRegex := getAttemptRegex(workloadName, checkName)
matches := attemptRegex.FindStringSubmatch(pr.Name)
if len(matches) > 0 {
number, err := strconv.Atoi(matches[1])
if err != nil {
logger.Error(err, "Parsing the attempt number from provisioning request", "requestName", pr.Name)
return 1
} else {
return int32(number)
}
} else {
logger.Info("No attempt suffix in provisioning request", "requestName", pr.Name)
return 1
}
}

func getAttemptRegex(workloadName, checkName string) *regexp.Regexp {
prefix := getProvisioningRequestNamePrefix(workloadName, checkName)
escapedPrefix := regexp.QuoteMeta(prefix)
return regexp.MustCompile("^" + escapedPrefix + "([0-9]+)$")
}

func remainingTime(prc *kueue.ProvisioningRequestConfig, failuresCount int32, lastFailureTime time.Time) time.Duration {
defaultBackoff := time.Duration(MinBackoffSeconds) * time.Second
maxBackoff := 30 * time.Minute
backoffDuration := defaultBackoff
for i := 1; i < int(failuresCount); i++ {
backoffDuration *= 2
if backoffDuration >= maxBackoff {
backoffDuration = maxBackoff
break
}
}
timeElapsedSinceLastFailure := time.Since(lastFailureTime)
return backoffDuration - timeElapsedSinceLastFailure
}
Loading