Skip to content

Commit

Permalink
scheduler: fix reservation plugin clone concurrent read write (#2084)
Browse files Browse the repository at this point in the history
Signed-off-by: xingbao.zy <xingbao.zy@alibaba-inc.com>
Co-authored-by: xingbao.zy <xingbao.zy@alibaba-inc.com>
  • Loading branch information
buptcozy and xingbao.zy authored Jun 7, 2024
1 parent a98e50d commit 1da7b33
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -148,6 +149,7 @@ func (pl *Plugin) EventsToRegister() []framework.ClusterEventWithHint {
var _ framework.StateData = &stateData{}

type stateData struct {
preemptLock sync.RWMutex
hasAffinity bool
podRequests corev1.ResourceList
podRequestsResources *framework.Resource
Expand Down Expand Up @@ -188,6 +190,10 @@ func (s *stateData) Clone() framework.StateData {
preferredNode: s.preferredNode,
assumed: s.assumed,
}

s.preemptLock.RLock()
defer s.preemptLock.RUnlock()

preemptible := map[string]corev1.ResourceList{}
for nodeName, returned := range s.preemptible {
preemptible[nodeName] = returned.DeepCopy()
Expand Down Expand Up @@ -244,6 +250,7 @@ func (pl *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleStat
}

var preResult *framework.PreFilterResult

state := getStateData(cycleState)
if state.hasAffinity {
if len(state.nodeReservationStates) == 0 {
Expand Down Expand Up @@ -275,6 +282,9 @@ func (pl *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState,
if rInfo == nil {
rInfo = pl.GetNominatedReservation(podInfoToAdd.Pod, node.Name)
}

state.preemptLock.Lock()
defer state.preemptLock.Unlock()
if rInfo == nil {
preemptible := state.preemptible[node.Name]
state.preemptible[node.Name] = quotav1.Subtract(preemptible, podRequests)
Expand Down Expand Up @@ -303,6 +313,9 @@ func (pl *Plugin) RemovePod(ctx context.Context, cycleState *framework.CycleStat
if rInfo == nil {
rInfo = pl.GetNominatedReservation(podInfoToRemove.Pod, node.Name)
}

state.preemptLock.Lock()
defer state.preemptLock.Unlock()
if rInfo == nil {
preemptible := state.preemptible[node.Name]
state.preemptible[node.Name] = quotav1.Add(preemptible, podRequests)
Expand Down Expand Up @@ -368,14 +381,24 @@ func (pl *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState,
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReservationAffinity)
}

if len(state.preemptible[node.Name]) > 0 || len(state.preemptibleInRRs[node.Name]) > 0 {
preemptible := state.preemptible[node.Name]
preemptibleResource := framework.NewResource(preemptible)
insufficientResources := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, nil, preemptibleResource)
if len(insufficientResources) != 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed)
status := func() *framework.Status {
state.preemptLock.RLock()
defer state.preemptLock.RUnlock()

if len(state.preemptible[node.Name]) > 0 || len(state.preemptibleInRRs[node.Name]) > 0 {
preemptible := state.preemptible[node.Name]
preemptibleResource := framework.NewResource(preemptible)
insufficientResources := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, nil, preemptibleResource)
if len(insufficientResources) != 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed)
}
}
return nil
}()
if !status.IsSuccess() {
return status
}

return nil
}

Expand All @@ -401,10 +424,13 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew
continue
}

state.preemptLock.RLock()
preemptibleInRR := state.preemptibleInRRs[node.Name][rInfo.UID()]
preemptible := framework.NewResource(preemptibleInRR)
preemptible.Add(state.preemptible[node.Name])
insufficientResourcesByNode := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, rInfo, preemptible)
state.preemptLock.RUnlock()

nodeFits := len(insufficientResourcesByNode) == 0
allocatePolicy := rInfo.GetAllocatePolicy()
if allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyDefault ||
Expand Down

0 comments on commit 1da7b33

Please sign in to comment.