Skip to content

Commit

Permalink
Merge pull request #2593 from wangyang0616/bugfix_2496
Browse files Browse the repository at this point in the history
Add node image information to the cache of the scheduler.
  • Loading branch information
volcano-sh-bot authored Dec 14, 2022
2 parents 008c341 + 4e77863 commit 35ab73f
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 17 deletions.
28 changes: 25 additions & 3 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/klog"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"

"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)
Expand Down Expand Up @@ -79,6 +80,11 @@ type NodeInfo struct {

// Resource Oversubscription feature: the Oversubscription Resource reported in annotation
OversubscriptionResource *Resource

// ImageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
// checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
// state information.
ImageStates map[string]*k8sframework.ImageStateSummary
}

// FutureIdle returns resources that will be idle in the future:
Expand Down Expand Up @@ -134,7 +140,8 @@ func NewNodeInfo(node *v1.Node) *NodeInfo {
OversubscriptionResource: EmptyResource(),
Tasks: make(map[TaskID]*TaskInfo),

GPUDevices: make(map[int]*GPUDevice),
GPUDevices: make(map[int]*GPUDevice),
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
}

nodeInfo.setOversubscription(node)
Expand Down Expand Up @@ -202,6 +209,8 @@ func (ni *NodeInfo) Clone() *NodeInfo {
klog.V(5).Infof("current Policies : %v", res.NumaSchedulerInfo.Policies)
}

klog.V(5).Infof("imageStates is %v", res.ImageStates)

res.Others = ni.Others
return res
}
Expand Down Expand Up @@ -526,8 +535,8 @@ func (ni NodeInfo) String() string {
}

return fmt.Sprintf("Node (%s): allocatable<%v> idle <%v>, used <%v>, releasing <%v>, oversubscribution <%v>, "+
"state <phase %s, reaseon %s>, oversubscributionNode <%v>, offlineJobEvicting <%v>,taints <%v>%s",
ni.Name, ni.Allocatable, ni.Idle, ni.Used, ni.Releasing, ni.OversubscriptionResource, ni.State.Phase, ni.State.Reason, ni.OversubscriptionNode, ni.OfflineJobEvicting, ni.Node.Spec.Taints, tasks)
"state <phase %s, reaseon %s>, oversubscributionNode <%v>, offlineJobEvicting <%v>,taints <%v>%s, imageStates %v",
ni.Name, ni.Allocatable, ni.Idle, ni.Used, ni.Releasing, ni.OversubscriptionResource, ni.State.Phase, ni.State.Reason, ni.OversubscriptionNode, ni.OfflineJobEvicting, ni.Node.Spec.Taints, tasks, ni.ImageStates)
}

// Pods returns all pods running in that node
Expand Down Expand Up @@ -628,6 +637,19 @@ func (ni *NodeInfo) getUnhealthyGPUs(node *v1.Node) (unhealthyGPUs []int) {
return
}

// Clone Image State
func (ni *NodeInfo) CloneImageSumary() map[string]*k8sframework.ImageStateSummary {
nodeImageStates := make(map[string]*k8sframework.ImageStateSummary)
for imageName, summary := range ni.ImageStates {
newImageSummary := &k8sframework.ImageStateSummary{
Size: summary.Size,
NumNodes: summary.NumNodes,
}
nodeImageStates[imageName] = newImageSummary
}
return nodeImageStates
}

func (cs *CSINodeStatusInfo) Clone() *CSINodeStatusInfo {
newcs := &CSINodeStatusInfo{
CSINodeName: cs.CSINodeName,
Expand Down
11 changes: 8 additions & 3 deletions pkg/scheduler/api/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

func nodeInfoEqual(l, r *NodeInfo) bool {
Expand Down Expand Up @@ -64,7 +65,8 @@ func TestNodeInfo_AddPod(t *testing.T) {
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p2": NewTaskInfo(case01Pod2),
},
GPUDevices: make(map[int]*GPUDevice),
GPUDevices: make(map[int]*GPUDevice),
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
},
},
{
Expand All @@ -85,6 +87,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{},
GPUDevices: make(map[int]*GPUDevice),
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
},
expectedFailure: true,
},
Expand Down Expand Up @@ -146,7 +149,8 @@ func TestNodeInfo_RemovePod(t *testing.T) {
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p3": NewTaskInfo(case01Pod3),
},
GPUDevices: make(map[int]*GPUDevice),
GPUDevices: make(map[int]*GPUDevice),
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
},
},
}
Expand Down Expand Up @@ -208,7 +212,8 @@ func TestNodeInfo_SetNode(t *testing.T) {
"c1/p2": NewTaskInfo(case01Pod2),
"c1/p3": NewTaskInfo(case01Pod3),
},
GPUDevices: make(map[int]*GPUDevice),
GPUDevices: make(map[int]*GPUDevice),
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
},
},
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cache
import (
"context"
"fmt"
"k8s.io/kubernetes/pkg/scheduler/framework"
"os"
"strconv"
"strings"
Expand All @@ -35,6 +36,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
infov1 "k8s.io/client-go/informers/core/v1"
Expand Down Expand Up @@ -144,6 +146,16 @@ type SchedulerCache struct {
BindFlowChannel chan *schedulingapi.TaskInfo
bindCache []*schedulingapi.TaskInfo
batchNum int

// A map from image name to its imageState.
imageStates map[string]*imageState
}

type imageState struct {
// Size of the image
size int64
// A set of node names for nodes having this image present
nodes sets.String
}

type DefaultBinder struct {
Expand Down Expand Up @@ -434,6 +446,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
nodeSelectorLabels: make(map[string]string),
NamespaceCollection: make(map[string]*schedulingapi.NamespaceCollection),
CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo),
imageStates: make(map[string]*imageState),

NodeList: []string{},
}
Expand Down Expand Up @@ -1055,6 +1068,7 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo {
}

snapshot.Nodes[value.Name] = value.Clone()
snapshot.Nodes[value.Name].ImageStates = value.CloneImageSumary()

if value.RevocableZone != "" {
snapshot.RevocableNodes[value.Name] = snapshot.Nodes[value.Name]
Expand Down Expand Up @@ -1331,3 +1345,11 @@ func (sc *SchedulerCache) setMetricsData(usageInfo map[string]*schedulingapi.Nod
}
}
}

// createImageStateSummary returns a summarizing snapshot of the given image's state.
func (sc *SchedulerCache) createImageStateSummary(state *imageState) *framework.ImageStateSummary {
return &framework.ImageStateSummary{
Size: state.size,
NumNodes: len(state.nodes),
}
}
59 changes: 58 additions & 1 deletion pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package cache
import (
"context"
"fmt"
"k8s.io/kubernetes/pkg/scheduler/framework"
"reflect"
"strconv"

v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
Expand Down Expand Up @@ -288,20 +290,75 @@ func (sc *SchedulerCache) DeletePod(obj interface{}) {
klog.V(3).Infof("Deleted pod <%s/%v> from cache.", pod.Namespace, pod.Name)
}

// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
func (sc *SchedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulingapi.NodeInfo) {
newSum := make(map[string]*framework.ImageStateSummary)

for _, image := range node.Status.Images {
for _, name := range image.Names {
// update the entry in imageStates
state, ok := sc.imageStates[name]
if !ok {
state = &imageState{
size: image.SizeBytes,
nodes: sets.NewString(node.Name),
}
sc.imageStates[name] = state
} else {
state.nodes.Insert(node.Name)
}
// create the imageStateSummary for this image
if _, ok := newSum[name]; !ok {
newSum[name] = sc.createImageStateSummary(state)
}
}
}
nodeInfo.ImageStates = newSum
}

// removeNodeImageStates removes the given node record from image entries having the node
// in imageStates cache. After the removal, if any image becomes free, i.e., the image
// is no longer available on any node, the image entry will be removed from imageStates.
func (sc *SchedulerCache) removeNodeImageStates(node *v1.Node) {
if node == nil {
return
}

for _, image := range node.Status.Images {
for _, name := range image.Names {
state, ok := sc.imageStates[name]
if ok {
state.nodes.Delete(node.Name)
if len(state.nodes) == 0 {
// Remove the unused image to make sure the length of
// imageStates represents the total number of different
// images on all nodes
delete(sc.imageStates, name)
}
}
}
}
}

// Assumes that lock is already acquired.
func (sc *SchedulerCache) addNode(node *v1.Node) error {
if sc.Nodes[node.Name] != nil {
sc.Nodes[node.Name].SetNode(node)
sc.removeNodeImageStates(node)
} else {
sc.Nodes[node.Name] = schedulingapi.NewNodeInfo(node)
}
sc.addNodeImageStates(node, sc.Nodes[node.Name])
return nil
}

// Assumes that lock is already acquired.
func (sc *SchedulerCache) updateNode(oldNode, newNode *v1.Node) error {
if sc.Nodes[newNode.Name] != nil {
sc.Nodes[newNode.Name].SetNode(newNode)
sc.removeNodeImageStates(newNode)
sc.addNodeImageStates(newNode, sc.Nodes[newNode.Name])
return nil
}

Expand All @@ -322,7 +379,7 @@ func (sc *SchedulerCache) deleteNode(node *v1.Node) error {
klog.Errorf("delete numatopo <%s/%s> failed.", numaInfo.Namespace, numaInfo.Name)
}
}

sc.removeNodeImageStates(node)
delete(sc.Nodes, node.Name)

return nil
Expand Down
20 changes: 10 additions & 10 deletions pkg/scheduler/plugins/nodeorder/nodeorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,66 +262,66 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
if weight.imageLocalityWeight != 0 {
score, status := imageLocality.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Image Locality Priority Failed because of Error: %v", status.AsError())
klog.Warningf("Node: %s, Image Locality Priority Failed because of Error: %v", node.Name, status.AsError())
return 0, status.AsError()
}

// If imageLocalityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.imageLocalityWeight)
klog.V(4).Infof("Image Locality score: %f", nodeScore)
klog.V(4).Infof("Node: %s, Image Locality score: %f", node.Name, nodeScore)
}

// NodeResourcesLeastAllocated
if weight.leastReqWeight != 0 {
score, status := leastAllocated.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Least Allocated Priority Failed because of Error: %v", status.AsError())
klog.Warningf("Node: %s, Least Allocated Priority Failed because of Error: %v", node.Name, status.AsError())
return 0, status.AsError()
}

// If leastReqWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.leastReqWeight)
klog.V(4).Infof("Least Request score: %f", nodeScore)
klog.V(4).Infof("Node: %s, Least Request score: %f", node.Name, nodeScore)
}

// NodeResourcesMostAllocated
if weight.mostReqWeight != 0 {
score, status := mostAllocation.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Most Allocated Priority Failed because of Error: %v", status.AsError())
klog.Warningf("Node: %s, Most Allocated Priority Failed because of Error: %v", node.Name, status.AsError())
return 0, status.AsError()
}

// If mostRequestedWeight is provided, host.Score is multiplied with weight, it's 0 by default
nodeScore += float64(score) * float64(weight.mostReqWeight)
klog.V(4).Infof("Most Request score: %f", nodeScore)
klog.V(4).Infof("Node: %s, Most Request score: %f", node.Name, nodeScore)
}

// NodeResourcesBalancedAllocation
if weight.balancedResourceWeight != 0 {
score, status := balancedAllocation.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Balanced Resource Allocation Priority Failed because of Error: %v", status.AsError())
klog.Warningf("Node: %s, Balanced Resource Allocation Priority Failed because of Error: %v", node.Name, status.AsError())
return 0, status.AsError()
}

// If balancedResourceWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.balancedResourceWeight)
klog.V(4).Infof("Balanced Request score: %f", nodeScore)
klog.V(4).Infof("Node: %s, Balanced Request score: %f", node.Name, nodeScore)
}

// NodeAffinity
if weight.nodeAffinityWeight != 0 {
score, status := nodeAffinity.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Calculate Node Affinity Priority Failed because of Error: %v", status.AsError())
klog.Warningf("Node: %s, Calculate Node Affinity Priority Failed because of Error: %v", node.Name, status.AsError())
return 0, status.AsError()
}

// TODO: should we normalize the score
// If nodeAffinityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.nodeAffinityWeight)
klog.V(4).Infof("Node Affinity score: %f", nodeScore)
klog.V(4).Infof("Node: %s, Node Affinity score: %f", node.Name, nodeScore)
}

klog.V(4).Infof("Total Score for task %s/%s on node %s is: %f", task.Namespace, task.Name, node.Name, nodeScore)
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/plugins/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ func GenerateNodeMapAndSlice(nodes map[string]*api.NodeInfo) map[string]*schedul
nodeInfo := schedulernodeinfo.NewNodeInfo(node.Pods()...)
nodeInfo.SetNode(node.Node)
nodeMap[node.Name] = nodeInfo
// add imagestate into nodeinfo
nodeMap[node.Name].ImageStates = node.CloneImageSumary()
}
return nodeMap
}
Expand Down

0 comments on commit 35ab73f

Please sign in to comment.