Skip to content

Commit

Permalink
DRA: fix scheduler/resource claim controller race with retry
Browse files Browse the repository at this point in the history
The JSON patch approach works, but it is complex. A retry loop is easier to
understand (detect conflict, get new claim, try again). There is one additional
API call (the get), but in practice this scenario is unlikely.
  • Loading branch information
pohly committed Jun 27, 2024
1 parent ecbafb8 commit 4bddebc
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 92 deletions.
168 changes: 76 additions & 92 deletions pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ package dynamicresources

import (
"context"
"encoding/json"
"errors"
"fmt"
"slices"
"sort"
"strings"
"sync"

"github.com/google/go-cmp/cmp"
Expand All @@ -41,6 +39,7 @@ import (
"k8s.io/client-go/kubernetes"
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -1461,7 +1460,9 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
// The allocation would be enough. The full object is useful for
// debugging and testing, so let's make it realistic.
claim = claim.DeepCopy()
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) {
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
}
claim.Status.DriverName = driverName
claim.Status.Allocation = allocation
pl.inFlightAllocations.Store(claim.UID, claim)
Expand Down Expand Up @@ -1620,105 +1621,88 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat
// and reservation are recorded. This finishes the work started in Reserve.
func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (patchedClaim *resourcev1alpha2.ResourceClaim, finalErr error) {
logger := klog.FromContext(ctx)
claim := state.claims[index]
driverName := ""
allocationObject := ""

claim := state.claims[index].DeepCopy()
allocation := state.informationsForClaim[index].allocation
logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation))
defer func() {
if allocation != nil {
// The scheduler was handling allocation. Now that has
// completed, either successfully or with a failure.
if finalErr == nil {
// This can fail, but only for reasons that are okay (concurrent delete or update).
// Shouldn't happen in this case.
if err := pl.claimAssumeCache.Assume(claim); err != nil {
logger.V(5).Info("Claim not stored in assume cache", "err", finalErr)
}
}
pl.inFlightAllocations.Delete(claim.UID)
}
}()

// Do we need to store an allocation result from Reserve?
if allocation != nil {
buffer, err := json.Marshal(allocation)
if err != nil {
return nil, fmt.Errorf("marshaling AllocationResult failed: %v", err)
logger.V(5).Info("preparing claim status update", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation))

// We may run into a ResourceVersion conflict because there may be some
// benign concurrent changes. In that case we get the latest claim and
// try again.
refreshClaim := false
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
if refreshClaim {
updatedClaim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Get(ctx, claim.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("get updated claim %s after conflict: %w", klog.KObj(claim), err)
}
logger.V(5).Info("retrying update after conflict", "claim", klog.KObj(claim))
claim = updatedClaim
} else {
// All future retries must get a new claim first.
refreshClaim = true
}
driverName = state.informationsForClaim[index].allocationDriverName
allocationObject = string(buffer)

// The finalizer needs to be added in a normal update. Using a simple update is fine
// because we don't expect concurrent modifications while the claim is not allocated
// yet. If there are any, we want to fail.
//
// If we were interrupted in the past, it might already be set and we simply continue.
if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) {
claim := state.claims[index].DeepCopy()
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil {
return nil, fmt.Errorf("add finalizer: %v", err)
if claim.DeletionTimestamp != nil {
return fmt.Errorf("claim %s got deleted in the meantime", klog.KObj(claim))
}

// Do we need to store an allocation result from Reserve?
if allocation != nil {
if claim.Status.Allocation != nil {
return fmt.Errorf("claim %s got allocated elsewhere in the meantime", klog.KObj(claim))
}

// The finalizer needs to be added in a normal update.
// If we were interrupted in the past, it might already be set and we simply continue.
if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) {
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
updatedClaim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("add finalizer to claim %s: %w", klog.KObj(claim), err)
}
claim = updatedClaim
}

claim.Status.DriverName = state.informationsForClaim[index].allocationDriverName
claim.Status.Allocation = allocation
}
}

// The claim might be stale, for example because the claim can get shared and some
// other goroutine has updated it in the meantime. We therefore cannot use
// SSA here to add the pod because then we would have to send the entire slice
// or use different field manager strings for each entry.
//
// With a strategic-merge-patch, we can simply send one new entry. The apiserver
// validation will catch if two goroutines try to do that at the same time and
// the claim cannot be shared.
//
// Note that this also works when the allocation result gets added twice because
// two pods both started using a shared claim: the first pod to get here adds the
// allocation result. The second pod then only adds itself to reservedFor.
//
// Because we allow concurrent updates, we don't check ResourceVersion
// in a precondition. What we *do* need to check is that the finalizer
// is still present. This implies using a JSON patch instead of a
// simpler strategic-merge-patch.
parts := make([]string, 0, 7)
parts = append(parts,
fmt.Sprintf(`{ "op": "test", "path": "/metadata/uid", "value": %q }`, claim.UID),
)
// Strictly speaking, we only need to check for existence of our finalizer.
// JSON patch does not support that, so we check for the stronger "array is equal".
if len(claim.Finalizers) > 0 {
parts = append(parts,
fmt.Sprintf(`{ "op": "test", "path": "/metadata/finalizers", "value": %q }`, claim.Finalizers),
)
}
// JSON patch can only append to a non-empty array. An empty reservedFor gets
// omitted and even if it didn't, it would be null and not an empty array.
// Therefore we have to test and add if it's currently empty.
reservedForEntry := fmt.Sprintf(`{"resource": "pods", "name": %q, "uid": %q}`, pod.Name, pod.UID)
if len(claim.Status.ReservedFor) == 0 {
parts = append(parts,
fmt.Sprintf(`{ "op": "test", "path": "/status/reservedFor", "value": null }`),
fmt.Sprintf(`{ "op": "add", "path": "/status/reservedFor", "value": [ %s ] }`, reservedForEntry),
)
} else {
parts = append(parts,
fmt.Sprintf(`{ "op": "add", "path": "/status/reservedFor/-", "value": %s }`, reservedForEntry),
)
}
if allocationObject != "" {
parts = append(parts,
fmt.Sprintf(`{ "op": "add", "path": "/status/driverName", "value": %q }`, driverName),
fmt.Sprintf(`{ "op": "add", "path": "/status/allocation", "value": %s }`, allocationObject),
)
}
patch := "[\n" + strings.Join(parts, ",\n") + "\n]"
if loggerV := logger.V(6); loggerV.Enabled() {
logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim), "patch", patch)
} else {
logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim))
}
claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.JSONPatchType, []byte(patch), metav1.PatchOptions{}, "status")
logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim), "err", err)
if allocationObject != "" {
// The scheduler was handling allocation. Now that has
// completed, either successfully or with a failure.
if err == nil {
// This can fail, but only for reasons that are okay (concurrent delete or update).
// Shouldn't happen in this case.
if err := pl.claimAssumeCache.Assume(claim); err != nil {
logger.V(5).Info("Claim not stored in assume cache", "err", err)
// We can simply try to add the pod here without checking
// preconditions. The apiserver will tell us with a
// non-conflict error if this isn't possible.
claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: pod.Name, UID: pod.UID})
updatedClaim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
if err != nil {
if allocation != nil {
return fmt.Errorf("add allocation and reservation to claim %s: %w", klog.KObj(claim), err)
}
return fmt.Errorf("add reservation to claim %s: %w", klog.KObj(claim), err)
}
pl.inFlightAllocations.Delete(claim.UID)
claim = updatedClaim
return nil
})

if retryErr != nil {
return nil, retryErr
}
return claim, err

logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim))
return claim, nil
}

// PostBind is called after a pod is successfully bound to a node. Now we are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,115 @@ func TestPlugin(t *testing.T) {
},
},
},
"delayed-allocation-structured-with-resources-has-finalizer": {
// As before. but the finalizer is already set. Could happen if
// the scheduler got interrupted.
pod: podWithClaimName,
claims: func() []*resourcev1alpha2.ResourceClaim {
claim := pendingDelayedClaim.DeepCopy()
claim.Finalizers = structuredAllocatedClaim.Finalizers
return []*resourcev1alpha2.ResourceClaim{claim}
}(),
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
objs: []apiruntime.Object{workerNodeSlice},
want: want{
reserve: result{
inFlightClaim: structuredAllocatedClaim,
},
prebind: result{
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Status = structuredInUseClaim.Status
}
return claim
},
},
},
postbind: result{
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
},
},
},
"delayed-allocation-structured-with-resources-finalizer-gets-removed": {
// As before. but the finalizer is already set. Then it gets
// removed before the scheduler reaches PreBind.
pod: podWithClaimName,
claims: func() []*resourcev1alpha2.ResourceClaim {
claim := pendingDelayedClaim.DeepCopy()
claim.Finalizers = structuredAllocatedClaim.Finalizers
return []*resourcev1alpha2.ResourceClaim{claim}
}(),
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
objs: []apiruntime.Object{workerNodeSlice},
prepare: prepare{
prebind: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
claim.Finalizers = nil
return claim
},
},
},
want: want{
reserve: result{
inFlightClaim: structuredAllocatedClaim,
},
prebind: result{
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Finalizers = structuredAllocatedClaim.Finalizers
claim.Status = structuredInUseClaim.Status
}
return claim
},
},
},
postbind: result{
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
},
},
},
"delayed-allocation-structured-with-resources-finalizer-gets-added": {
// No finalizer initially, then it gets added before
// the scheduler reaches PreBind. Shouldn't happen?
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClass},
objs: []apiruntime.Object{workerNodeSlice},
prepare: prepare{
prebind: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
claim.Finalizers = structuredAllocatedClaim.Finalizers
return claim
},
},
},
want: want{
reserve: result{
inFlightClaim: structuredAllocatedClaim,
},
prebind: result{
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
changes: change{
claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Status = structuredInUseClaim.Status
}
return claim
},
},
},
postbind: result{
assumedClaim: reserve(structuredAllocatedClaim, podWithClaimName),
},
},
},
"delayed-allocation-structured-skip-bind": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
Expand Down

0 comments on commit 4bddebc

Please sign in to comment.