Skip to content

Commit

Permalink
improve the nodeclaim sorting algorithm using heap
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Shane <327411586@qq.com>
  • Loading branch information
jxs1211 committed Dec 28, 2024
1 parent 1db5097 commit 7e40621
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 5 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ require (
sigs.k8s.io/yaml v1.4.0 // indirect
)

require github.com/stretchr/testify v1.9.0

require github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect

retract (
v0.100.101-test // accidentally published testing version
v0.35.3 // accidentally published incomplete patch release
Expand Down
46 changes: 46 additions & 0 deletions pkg/controllers/provisioning/scheduling/heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scheduling

import "container/heap"

type NodeClaimHeap []*NodeClaim

var (
_ = heap.Interface(&NodeClaimHeap{}) // NodeClaimHeap is a standard heap
)

func NewNodeClaimHeap(nodeClaims []*NodeClaim) *NodeClaimHeap {
h := NodeClaimHeap(nodeClaims)
return &h
}

func (h NodeClaimHeap) Len() int { return len(h) }
func (h NodeClaimHeap) Less(i, j int) bool { return len(h[i].Pods) < len(h[j].Pods) }
func (h NodeClaimHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *NodeClaimHeap) Push(x interface{}) {
*h = append(*h, x.(*NodeClaim))
}

func (h *NodeClaimHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
131 changes: 131 additions & 0 deletions pkg/controllers/provisioning/scheduling/heap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scheduling

import (
"container/heap"
"math/rand"
"sort"
"strconv"
"testing"

"github.com/samber/lo"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
)

func TestNodeClaimHeap_PopOrder(t *testing.T) {
// Create NodeClaims with different pod counts
nc1 := &NodeClaim{Pods: []*corev1.Pod{{}, {}}} // 2 pods
nc2 := &NodeClaim{Pods: []*corev1.Pod{{}}} // 1 pod
nc3 := &NodeClaim{Pods: []*corev1.Pod{{}, {}, {}}} // 3 pods
nc4 := &NodeClaim{Pods: []*corev1.Pod{}} // 0 pods

// Initialize heap with NodeClaims
h := NewNodeClaimHeap([]*NodeClaim{nc1, nc2, nc3, nc4})
heap.Init(h)

// Pop items and verify they come out in ascending order of pod count
expected := []*NodeClaim{nc4, nc2, nc1, nc3}

for i := 0; i < len(expected); i++ {
item := heap.Pop(h).(*NodeClaim)
assert.Equal(t, len(expected[i].Pods), len(item.Pods),
"Expected NodeClaim with %d pods, got %d pods",
len(expected[i].Pods), len(item.Pods))
}

// Verify heap is empty
assert.Equal(t, 0, h.Len(), "Heap should be empty after popping all items")
}

func makeNodeClaims(size int) []*NodeClaim {
nodeClaims := make([]*NodeClaim, 0, size)
return lo.Map(nodeClaims, func(nc *NodeClaim, i int) *NodeClaim {
podCount := rand.Intn(100)

Check failure on line 62 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.25.x)

G404: Use of weak random number generator (math/rand or math/rand/v2 instead of crypto/rand) (gosec)

Check failure on line 62 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.26.x)

G404: Use of weak random number generator (math/rand or math/rand/v2 instead of crypto/rand) (gosec)

Check failure on line 62 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.27.x)

G404: Use of weak random number generator (math/rand or math/rand/v2 instead of crypto/rand) (gosec)

Check failure on line 62 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.28.x)

G404: Use of weak random number generator (math/rand or math/rand/v2 instead of crypto/rand) (gosec)

Check failure on line 62 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.29.x)

G404: Use of weak random number generator (math/rand or math/rand/v2 instead of crypto/rand) (gosec)

Check failure on line 62 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.30.x)

G404: Use of weak random number generator (math/rand or math/rand/v2 instead of crypto/rand) (gosec)

Check failure on line 62 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.31.x)

G404: Use of weak random number generator (math/rand or math/rand/v2 instead of crypto/rand) (gosec)
pods := lo.Map(lo.Range(podCount), func(_ int, j int) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pod-" + strconv.Itoa(j)},
}
})
return &NodeClaim{Pods: pods}
})
}

func cloneNodeClaims(original []*NodeClaim) []*NodeClaim {
return lo.Map(original, func(nc *NodeClaim, _ int) *NodeClaim {
copiedPods := lo.Map(nc.Pods, func(pod *corev1.Pod, _ int) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: pod.Name},
}
})
return &NodeClaim{
Pods: copiedPods,
NodeClaimTemplate: NodeClaimTemplate{
NodeClaim: v1.NodeClaim{
Spec: v1.NodeClaimSpec{
Taints: []corev1.Taint{
{
Key: "custom-taint",
Effect: corev1.TaintEffectNoSchedule,
Value: "custom-value",
},
},
},
},
},
}
})
}

var nodeClaims = makeNodeClaims(3000)
var noTolerationPod = &corev1.Pod{
Spec: corev1.PodSpec{
Tolerations: nil,
},
}

func BenchmarkHeapSorting(b *testing.B) {
// clone the nodeClaims as identical test data for benchmark
nodeClaims := cloneNodeClaims(nodeClaims)

Check failure on line 107 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.25.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)

Check failure on line 107 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.26.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)

Check failure on line 107 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.27.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)

Check failure on line 107 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.28.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)

Check failure on line 107 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.29.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)

Check failure on line 107 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.30.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)

Check failure on line 107 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.31.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)
b.ResetTimer()
for i := 0; i < b.N; i++ {
h := NodeClaimHeap(nodeClaims)
heap.Init(&h)
for h.Len() > 0 {
nodeClaim := heap.Pop(&h).(*NodeClaim)
_ = nodeClaim.Add(noTolerationPod, nil)
}
}
}

func BenchmarkSliceSorting(b *testing.B) {
// clone the nodeClaims as identical test data for benchmark
nodeClaims := cloneNodeClaims(nodeClaims)

Check failure on line 121 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.25.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)

Check failure on line 121 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.26.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)

Check failure on line 121 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.27.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)

Check failure on line 121 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.28.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)

Check failure on line 121 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.29.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)

Check failure on line 121 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.30.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)

Check failure on line 121 in pkg/controllers/provisioning/scheduling/heap_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.31.x)

shadow: declaration of "nodeClaims" shadows declaration at line 98 (govet)
b.ResetTimer()
for i := 0; i < b.N; i++ {
sort.Slice(nodeClaims, func(a, b int) bool {
return len(nodeClaims[a].Pods) < len(nodeClaims[b].Pods)
})
for _, nodeClaim := range nodeClaims {
_ = nodeClaim.Add(noTolerationPod, nil)
}
}
}
10 changes: 5 additions & 5 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package scheduling

import (
"bytes"
"container/heap"
"context"
"fmt"
"sort"
Expand Down Expand Up @@ -273,11 +274,10 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
}
}

// Consider using https://pkg.go.dev/container/heap
sort.Slice(s.newNodeClaims, func(a, b int) bool { return len(s.newNodeClaims[a].Pods) < len(s.newNodeClaims[b].Pods) })

// Pick existing node that we are about to create
for _, nodeClaim := range s.newNodeClaims {
h := NewNodeClaimHeap(s.newNodeClaims)
heap.Init(h)
for h.Len() > 0 {
nodeClaim := heap.Pop(h).(*NodeClaim)
if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err == nil {
return nil
}
Expand Down

0 comments on commit 7e40621

Please sign in to comment.