Skip to content

Commit

Permalink
Revert "scheduler : fix deviceshare plugin of add\remove pod (koordin…
Browse files Browse the repository at this point in the history
…ator-sh#1963)"

This reverts commit ad01533.
  • Loading branch information
xulinfei.xlf committed Mar 25, 2024
1 parent ad01533 commit c3179a3
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 483 deletions.
14 changes: 0 additions & 14 deletions pkg/scheduler/plugins/deviceshare/device_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
quotav1 "k8s.io/apiserver/pkg/quota/v1"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/util"
)
Expand All @@ -34,19 +33,6 @@ import (
// "1": {koordinator.sh/gpu-core:100, koordinator.sh/gpu-memory-ratio:100, koordinator.sh/gpu-memory: 16GB}
type deviceResources map[int]corev1.ResourceList

func newDeviceResourcesFromAllocations(allocation apiext.DeviceAllocations) map[schedulingv1alpha1.DeviceType]deviceResources {
result := make(map[schedulingv1alpha1.DeviceType]deviceResources)

for deviceType, deviceDetails := range allocation {
result[deviceType] = make(deviceResources)
for _, deviceDetail := range deviceDetails {
result[deviceType][int(deviceDetail.Minor)] = deviceDetail.Resources.DeepCopy()
}
}

return result
}

func (r deviceResources) DeepCopy() deviceResources {
if r == nil {
return nil
Expand Down
15 changes: 0 additions & 15 deletions pkg/scheduler/plugins/deviceshare/device_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"

Expand Down Expand Up @@ -192,17 +191,3 @@ func Test_appendAllocatedByHints(t *testing.T) {
})
}
}

func TestNewDeviceResourcesFromAllocations(t *testing.T) {
allocation := make(apiext.DeviceAllocations)
allocation[schedulingv1alpha1.GPU] = append(allocation[schedulingv1alpha1.GPU], &apiext.DeviceAllocation{
Minor: 0,
Resources: corev1.ResourceList{
apiext.ResourceGPUCore: resource.MustParse("100"),
},
})

result := newDeviceResourcesFromAllocations(allocation)
res := result[schedulingv1alpha1.GPU][0][apiext.ResourceGPUCore]
assert.Equal(t, "100", res.String())
}
69 changes: 0 additions & 69 deletions pkg/scheduler/plugins/deviceshare/nominator.go

This file was deleted.

52 changes: 0 additions & 52 deletions pkg/scheduler/plugins/deviceshare/nominator_test.go

This file was deleted.

130 changes: 33 additions & 97 deletions pkg/scheduler/plugins/deviceshare/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type Plugin struct {
handle frameworkext.ExtendedHandle
nodeDeviceCache *nodeDeviceCache
scorer *resourceAllocationScorer
nominator *nominator
}

type preFilterState struct {
Expand Down Expand Up @@ -179,40 +178,12 @@ func (p *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, p
nd.lock.RLock()
defer nd.lock.RUnlock()

var podAllocated map[schedulingv1alpha1.DeviceType]deviceResources
podAllocated = nd.getUsed(podInfoToAdd.Pod.Namespace, podInfoToAdd.Pod.Name)

//in generally, when we execute AddPod logic here, the pod may be in scheduling status,
//it won't exist in nodeDeviceCache's used map, so there is a bug that when the framework execute
//RunFilterPluginsWithNominatedPods with AddPod for high priority pods, the plugin can't reserve resource for
//these high priority pods, In RDMA\VF scenario, it can cause high priority pods assign fail
//due to some resources is assigned by low priority pods. So we reused the "Reserve" logic to generate an assign
//placement and save it in nominator. We will clear the nominator cache In "Reserve" and "UnReserve", which means
//we will do clean job no matter assign success or not, this is the same process of the origin k8s framework
//nominate process.
if len(podAllocated) == 0 && p.nominator != nil {
if p.nominator.IsPodExist(podInfoToAdd.Pod) {
podAllocated = p.nominator.GetPodAllocated(podInfoToAdd.Pod)
} else {
assignFunc := func(state *preFilterState, nodeDeviceInfo *nodeDevice, pod *corev1.Pod, assignResult apiext.DeviceAllocations) {
//do nothing
}
result, _ := p.reserveInternal(ctx, cycleState, state, podInfoToAdd.Pod, nodeInfo, nd, assignFunc)
podAllocated = newDeviceResourcesFromAllocations(result)

p.nominator.AddPod(podInfoToAdd.Pod, podAllocated)
}
}

podAllocated := nd.getUsed(podInfoToAdd.Pod.Namespace, podInfoToAdd.Pod.Name)
if len(podAllocated) == 0 {
return nil
}

var rInfo *frameworkext.ReservationInfo
if reservation.GetReservationCache() != nil {
rInfo = reservation.GetReservationCache().GetReservationInfoByPod(podInfoToAdd.Pod, node.Name)
}

rInfo := reservation.GetReservationCache().GetReservationInfoByPod(podInfoToAdd.Pod, node.Name)
if rInfo == nil {
nominator := p.handle.GetReservationNominator()
if nominator != nil {
Expand Down Expand Up @@ -262,25 +233,12 @@ func (p *Plugin) RemovePod(ctx context.Context, cycleState *framework.CycleState
nd.lock.RLock()
defer nd.lock.RUnlock()

var podAllocated map[schedulingv1alpha1.DeviceType]deviceResources
podAllocated = nd.getUsed(podInfoToRemove.Pod.Namespace, podInfoToRemove.Pod.Name)

if len(podAllocated) == 0 && p.nominator != nil {
if p.nominator.IsPodExist(podInfoToRemove.Pod) {
podAllocated = p.nominator.GetPodAllocated(podInfoToRemove.Pod)
p.nominator.RemovePod(podInfoToRemove.Pod)
}
}

podAllocated := nd.getUsed(podInfoToRemove.Pod.Namespace, podInfoToRemove.Pod.Name)
if len(podAllocated) == 0 {
return nil
}

var rInfo *frameworkext.ReservationInfo
if reservation.GetReservationCache() != nil {
rInfo = reservation.GetReservationCache().GetReservationInfoByPod(podInfoToRemove.Pod, node.Name)
}

rInfo := reservation.GetReservationCache().GetReservationInfoByPod(podInfoToRemove.Pod, node.Name)
if rInfo == nil {
nominator := p.handle.GetReservationNominator()
if nominator != nil {
Expand Down Expand Up @@ -416,8 +374,25 @@ func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.Cy
return status
}

func (p *Plugin) reserveInternal(ctx context.Context, cycleState *framework.CycleState, state *preFilterState, pod *corev1.Pod,
nodeInfo *framework.NodeInfo, nodeDeviceInfo *nodeDevice, assignFunc func(state *preFilterState, nodeDeviceInfo *nodeDevice, pod *corev1.Pod, assignResult apiext.DeviceAllocations)) (apiext.DeviceAllocations, *framework.Status) {
func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return status
}
if state.skip {
return nil
}

nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return framework.AsStatus(err)
}

nodeDeviceInfo := p.nodeDeviceCache.getNodeDevice(nodeName, false)
if nodeDeviceInfo == nil {
return nil
}

store := topologymanager.GetStore(cycleState)
affinity := store.GetAffinity(nodeInfo.Node().Name)

Expand All @@ -431,68 +406,30 @@ func (p *Plugin) reserveInternal(ctx context.Context, cycleState *framework.Cycl
}

reservationRestoreState := getReservationRestoreState(cycleState)
restoreState := reservationRestoreState.getNodeState(nodeInfo.Node().Name)
preemptible := appendAllocated(nil, restoreState.mergedUnmatchedUsed, state.preemptibleDevices[nodeInfo.Node().Name])
restoreState := reservationRestoreState.getNodeState(nodeName)
preemptible := appendAllocated(nil, restoreState.mergedUnmatchedUsed, state.preemptibleDevices[nodeName])

nodeDeviceInfo.lock.Lock()
defer nodeDeviceInfo.lock.Unlock()

result, status := p.allocateWithNominatedReservation(
allocator, cycleState, state, restoreState, nodeInfo.Node(), pod, preemptible)
if !status.IsSuccess() {
return nil, status
return status
}
if len(result) == 0 {
preemptible = appendAllocated(preemptible, restoreState.mergedMatchedAllocatable)
result, status = allocator.Allocate(nil, nil, nil, preemptible)
if !status.IsSuccess() {
return nil, status
return status
}
}

assignFunc(state, nodeDeviceInfo, pod, result)

return result, nil
}

func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
if p.nominator != nil && p.nominator.IsPodExist(pod) {
p.nominator.RemovePod(pod)
}

state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return status
}
if state.skip {
return nil
}

nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return framework.AsStatus(err)
}

nodeDeviceInfo := p.nodeDeviceCache.getNodeDevice(nodeName, false)
if nodeDeviceInfo == nil {
return nil
}

nodeDeviceInfo.lock.Lock()
defer nodeDeviceInfo.lock.Unlock()

assignFunc := func(state *preFilterState, nodeDeviceInfo *nodeDevice, pod *corev1.Pod, assignResult apiext.DeviceAllocations) {
nodeDeviceInfo.updateCacheUsed(assignResult, pod, true)
state.allocationResult = assignResult
}

_, status = p.reserveInternal(ctx, cycleState, state, pod, nodeInfo, nodeDeviceInfo, assignFunc)

return status
nodeDeviceInfo.updateCacheUsed(result, pod, true)
state.allocationResult = result
return nil
}

func (p *Plugin) Unreserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) {
if p.nominator != nil && p.nominator.IsPodExist(pod) {
p.nominator.RemovePod(pod)
}

state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return
Expand Down Expand Up @@ -625,6 +562,5 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
handle: extendedHandle,
nodeDeviceCache: deviceCache,
scorer: scorePlugin(args),
nominator: NewNominator(),
}, nil
}
Loading

0 comments on commit c3179a3

Please sign in to comment.