Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node image information to the cache of the scheduler. #2593

Merged
merged 1 commit into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/klog"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"

"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)
Expand Down Expand Up @@ -79,6 +80,11 @@ type NodeInfo struct {

// Resource Oversubscription feature: the Oversubscription Resource reported in annotation
OversubscriptionResource *Resource

// ImageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
// checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
// state information.
ImageStates map[string]*k8sframework.ImageStateSummary
}

// FutureIdle returns resources that will be idle in the future:
Expand Down Expand Up @@ -134,7 +140,8 @@ func NewNodeInfo(node *v1.Node) *NodeInfo {
OversubscriptionResource: EmptyResource(),
Tasks: make(map[TaskID]*TaskInfo),

GPUDevices: make(map[int]*GPUDevice),
GPUDevices: make(map[int]*GPUDevice),
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
}

nodeInfo.setOversubscription(node)
Expand Down Expand Up @@ -202,6 +209,8 @@ func (ni *NodeInfo) Clone() *NodeInfo {
klog.V(5).Infof("current Policies : %v", res.NumaSchedulerInfo.Policies)
}

klog.V(5).Infof("imageStates is %v", res.ImageStates)

res.Others = ni.Others
return res
}
Expand Down Expand Up @@ -526,8 +535,8 @@ func (ni NodeInfo) String() string {
}

return fmt.Sprintf("Node (%s): allocatable<%v> idle <%v>, used <%v>, releasing <%v>, oversubscribution <%v>, "+
"state <phase %s, reaseon %s>, oversubscributionNode <%v>, offlineJobEvicting <%v>,taints <%v>%s",
ni.Name, ni.Allocatable, ni.Idle, ni.Used, ni.Releasing, ni.OversubscriptionResource, ni.State.Phase, ni.State.Reason, ni.OversubscriptionNode, ni.OfflineJobEvicting, ni.Node.Spec.Taints, tasks)
"state <phase %s, reaseon %s>, oversubscributionNode <%v>, offlineJobEvicting <%v>,taints <%v>%s, imageStates %v",
ni.Name, ni.Allocatable, ni.Idle, ni.Used, ni.Releasing, ni.OversubscriptionResource, ni.State.Phase, ni.State.Reason, ni.OversubscriptionNode, ni.OfflineJobEvicting, ni.Node.Spec.Taints, tasks, ni.ImageStates)
}

// Pods returns all pods running in that node
Expand Down Expand Up @@ -628,6 +637,19 @@ func (ni *NodeInfo) getUnhealthyGPUs(node *v1.Node) (unhealthyGPUs []int) {
return
}

// Clone Image State
func (ni *NodeInfo) CloneImageSumary() map[string]*k8sframework.ImageStateSummary {
nodeImageStates := make(map[string]*k8sframework.ImageStateSummary)
for imageName, summary := range ni.ImageStates {
newImageSummary := &k8sframework.ImageStateSummary{
Size: summary.Size,
NumNodes: summary.NumNodes,
}
nodeImageStates[imageName] = newImageSummary
}
return nodeImageStates
}

func (cs *CSINodeStatusInfo) Clone() *CSINodeStatusInfo {
newcs := &CSINodeStatusInfo{
CSINodeName: cs.CSINodeName,
Expand Down
11 changes: 8 additions & 3 deletions pkg/scheduler/api/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

func nodeInfoEqual(l, r *NodeInfo) bool {
Expand Down Expand Up @@ -64,7 +65,8 @@ func TestNodeInfo_AddPod(t *testing.T) {
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p2": NewTaskInfo(case01Pod2),
},
GPUDevices: make(map[int]*GPUDevice),
GPUDevices: make(map[int]*GPUDevice),
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
},
},
{
Expand All @@ -85,6 +87,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{},
GPUDevices: make(map[int]*GPUDevice),
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
},
expectedFailure: true,
},
Expand Down Expand Up @@ -146,7 +149,8 @@ func TestNodeInfo_RemovePod(t *testing.T) {
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p3": NewTaskInfo(case01Pod3),
},
GPUDevices: make(map[int]*GPUDevice),
GPUDevices: make(map[int]*GPUDevice),
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
},
},
}
Expand Down Expand Up @@ -208,7 +212,8 @@ func TestNodeInfo_SetNode(t *testing.T) {
"c1/p2": NewTaskInfo(case01Pod2),
"c1/p3": NewTaskInfo(case01Pod3),
},
GPUDevices: make(map[int]*GPUDevice),
GPUDevices: make(map[int]*GPUDevice),
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
},
},
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cache
import (
"context"
"fmt"
"k8s.io/kubernetes/pkg/scheduler/framework"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to import this package in this file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This package is referenced when imageState is used to initialize ImageStateSummary. This package is introduced by the createImageStateSummary function added to the cache file.

"os"
"strconv"
"strings"
Expand All @@ -35,6 +36,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
infov1 "k8s.io/client-go/informers/core/v1"
Expand Down Expand Up @@ -144,6 +146,16 @@ type SchedulerCache struct {
BindFlowChannel chan *schedulingapi.TaskInfo
bindCache []*schedulingapi.TaskInfo
batchNum int

// A map from image name to its imageState.
imageStates map[string]*imageState
}

type imageState struct {
// Size of the image
size int64
// A set of node names for nodes having this image present
nodes sets.String
}

type DefaultBinder struct {
Expand Down Expand Up @@ -434,6 +446,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
nodeSelectorLabels: make(map[string]string),
NamespaceCollection: make(map[string]*schedulingapi.NamespaceCollection),
CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo),
imageStates: make(map[string]*imageState),

NodeList: []string{},
}
Expand Down Expand Up @@ -1055,6 +1068,7 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo {
}

snapshot.Nodes[value.Name] = value.Clone()
snapshot.Nodes[value.Name].ImageStates = value.CloneImageSumary()

if value.RevocableZone != "" {
snapshot.RevocableNodes[value.Name] = snapshot.Nodes[value.Name]
Expand Down Expand Up @@ -1315,3 +1329,11 @@ func (sc *SchedulerCache) setMetricsData(usageInfo map[string]*schedulingapi.Nod
}
}
}

// createImageStateSummary returns a summarizing snapshot of the given image's state.
func (sc *SchedulerCache) createImageStateSummary(state *imageState) *framework.ImageStateSummary {
return &framework.ImageStateSummary{
Size: state.size,
NumNodes: len(state.nodes),
}
}
59 changes: 58 additions & 1 deletion pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package cache
import (
"context"
"fmt"
"k8s.io/kubernetes/pkg/scheduler/framework"
"reflect"
"strconv"

v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
Expand Down Expand Up @@ -288,20 +290,75 @@ func (sc *SchedulerCache) DeletePod(obj interface{}) {
klog.V(3).Infof("Deleted pod <%s/%v> from cache.", pod.Namespace, pod.Name)
}

// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
func (sc *SchedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulingapi.NodeInfo) {
newSum := make(map[string]*framework.ImageStateSummary)

for _, image := range node.Status.Images {
for _, name := range image.Names {
// update the entry in imageStates
state, ok := sc.imageStates[name]
if !ok {
state = &imageState{
size: image.SizeBytes,
nodes: sets.NewString(node.Name),
}
sc.imageStates[name] = state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where we use the data from sc.imageStates[name]? Is it enough that we only store image data in nodeinfo?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sc.imageStates stores the image information of all nodes in the cluster. The image name is key, and the information about the node that owns the image is value.

When a node is added or deleted, the system updates the information in sc.imageStates and updates the imageStates information in nodeInfo of the current node through sc.imageStates. The scheduler scores the node based on the imageStates information in nodeInfo of each node.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imageState in nodeinfo is necessary. I want to know the necessary of sc.imagestates.

Copy link
Member Author

@wangyang0616 wangyang0616 Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sc.imageStates maintains the image distribution information of all nodes in the cluster. When a node is added or deleted, sc.imageStates is updated. If sc.imageStates does not exist, all nodeinfo needs to be traversed for recalculation each time a node is added or deleted, which affects the performance and efficiency.

In addition, sc.imageStates is properly stored in the scheduler cache. Other structures are not found to store cluster imageStates information.

} else {
state.nodes.Insert(node.Name)
}
// create the imageStateSummary for this image
if _, ok := newSum[name]; !ok {
newSum[name] = sc.createImageStateSummary(state)
}
}
}
nodeInfo.ImageStates = newSum
}

// removeNodeImageStates removes the given node record from image entries having the node
// in imageStates cache. After the removal, if any image becomes free, i.e., the image
// is no longer available on any node, the image entry will be removed from imageStates.
func (sc *SchedulerCache) removeNodeImageStates(node *v1.Node) {
if node == nil {
return
}

for _, image := range node.Status.Images {
for _, name := range image.Names {
state, ok := sc.imageStates[name]
if ok {
state.nodes.Delete(node.Name)
if len(state.nodes) == 0 {
// Remove the unused image to make sure the length of
// imageStates represents the total number of different
// images on all nodes
delete(sc.imageStates, name)
}
}
}
}
}

// Assumes that lock is already acquired.
func (sc *SchedulerCache) addNode(node *v1.Node) error {
if sc.Nodes[node.Name] != nil {
sc.Nodes[node.Name].SetNode(node)
sc.removeNodeImageStates(node)
} else {
sc.Nodes[node.Name] = schedulingapi.NewNodeInfo(node)
}
sc.addNodeImageStates(node, sc.Nodes[node.Name])
return nil
}

// Assumes that lock is already acquired.
func (sc *SchedulerCache) updateNode(oldNode, newNode *v1.Node) error {
if sc.Nodes[newNode.Name] != nil {
sc.Nodes[newNode.Name].SetNode(newNode)
sc.removeNodeImageStates(newNode)
sc.addNodeImageStates(newNode, sc.Nodes[newNode.Name])
return nil
}

Expand All @@ -322,7 +379,7 @@ func (sc *SchedulerCache) deleteNode(node *v1.Node) error {
klog.Errorf("delete numatopo <%s/%s> failed.", numaInfo.Namespace, numaInfo.Name)
}
}

sc.removeNodeImageStates(node)
delete(sc.Nodes, node.Name)

return nil
Expand Down
20 changes: 10 additions & 10 deletions pkg/scheduler/plugins/nodeorder/nodeorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,66 +262,66 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
if weight.imageLocalityWeight != 0 {
score, status := imageLocality.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Image Locality Priority Failed because of Error: %v", status.AsError())
klog.Warningf("Node: %s, Image Locality Priority Failed because of Error: %v", node.Name, status.AsError())
return 0, status.AsError()
}

// If imageLocalityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.imageLocalityWeight)
klog.V(4).Infof("Image Locality score: %f", nodeScore)
klog.V(4).Infof("Node: %s, Image Locality score: %f", node.Name, nodeScore)
}

// NodeResourcesLeastAllocated
if weight.leastReqWeight != 0 {
score, status := leastAllocated.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Least Allocated Priority Failed because of Error: %v", status.AsError())
klog.Warningf("Node: %s, Least Allocated Priority Failed because of Error: %v", node.Name, status.AsError())
return 0, status.AsError()
}

// If leastReqWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.leastReqWeight)
klog.V(4).Infof("Least Request score: %f", nodeScore)
klog.V(4).Infof("Node: %s, Least Request score: %f", node.Name, nodeScore)
}

// NodeResourcesMostAllocated
if weight.mostReqWeight != 0 {
score, status := mostAllocation.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Most Allocated Priority Failed because of Error: %v", status.AsError())
klog.Warningf("Node: %s, Most Allocated Priority Failed because of Error: %v", node.Name, status.AsError())
return 0, status.AsError()
}

// If mostRequestedWeight is provided, host.Score is multiplied with weight, it's 0 by default
nodeScore += float64(score) * float64(weight.mostReqWeight)
klog.V(4).Infof("Most Request score: %f", nodeScore)
klog.V(4).Infof("Node: %s, Most Request score: %f", node.Name, nodeScore)
}

// NodeResourcesBalancedAllocation
if weight.balancedResourceWeight != 0 {
score, status := balancedAllocation.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Balanced Resource Allocation Priority Failed because of Error: %v", status.AsError())
klog.Warningf("Node: %s, Balanced Resource Allocation Priority Failed because of Error: %v", node.Name, status.AsError())
return 0, status.AsError()
}

// If balancedResourceWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.balancedResourceWeight)
klog.V(4).Infof("Balanced Request score: %f", nodeScore)
klog.V(4).Infof("Node: %s, Balanced Request score: %f", node.Name, nodeScore)
}

// NodeAffinity
if weight.nodeAffinityWeight != 0 {
score, status := nodeAffinity.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Calculate Node Affinity Priority Failed because of Error: %v", status.AsError())
klog.Warningf("Node: %s, Calculate Node Affinity Priority Failed because of Error: %v", node.Name, status.AsError())
return 0, status.AsError()
}

// TODO: should we normalize the score
// If nodeAffinityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.nodeAffinityWeight)
klog.V(4).Infof("Node Affinity score: %f", nodeScore)
klog.V(4).Infof("Node: %s, Node Affinity score: %f", node.Name, nodeScore)
}

klog.V(4).Infof("Total Score for task %s/%s on node %s is: %f", task.Namespace, task.Name, node.Name, nodeScore)
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/plugins/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ func GenerateNodeMapAndSlice(nodes map[string]*api.NodeInfo) map[string]*schedul
nodeInfo := schedulernodeinfo.NewNodeInfo(node.Pods()...)
nodeInfo.SetNode(node.Node)
nodeMap[node.Name] = nodeInfo
// add imagestate into nodeinfo
nodeMap[node.Name].ImageStates = node.CloneImageSumary()
}
return nodeMap
}
Expand Down