From c5ea9c4c077611e1f5250c3532a233825049b2b5 Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Mon, 10 Jun 2019 14:54:51 +0800 Subject: [PATCH] Ignore nodes if out of syc. Signed-off-by: Da K. Ma --- pkg/scheduler/api/node_info.go | 88 +++++++++++++++++++++++------ pkg/scheduler/api/node_info_test.go | 2 + pkg/scheduler/api/types.go | 21 +++++++ pkg/scheduler/cache/cache.go | 4 ++ 4 files changed, 99 insertions(+), 16 deletions(-) diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index 16257d414..679818db5 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -19,6 +19,8 @@ package api import ( "fmt" + "github.com/golang/glog" + v1 "k8s.io/api/core/v1" ) @@ -27,6 +29,9 @@ type NodeInfo struct { Name string Node *v1.Node + // The state of node + State NodeState + // The releasing resource on that node Releasing *Resource // The idle resource on that node @@ -44,10 +49,18 @@ type NodeInfo struct { Other interface{} } +// NodeState defines the current state of node. +type NodeState struct { + Phase NodePhase + Reason string +} + // NewNodeInfo is used to create new nodeInfo object func NewNodeInfo(node *v1.Node) *NodeInfo { + var ni *NodeInfo + if node == nil { - return &NodeInfo{ + ni = &NodeInfo{ Releasing: EmptyResource(), Idle: EmptyResource(), Used: EmptyResource(), @@ -57,21 +70,25 @@ func NewNodeInfo(node *v1.Node) *NodeInfo { Tasks: make(map[TaskID]*TaskInfo), } - } - - return &NodeInfo{ - Name: node.Name, - Node: node, + } else { + ni = &NodeInfo{ + Name: node.Name, + Node: node, - Releasing: EmptyResource(), - Idle: NewResource(node.Status.Allocatable), - Used: EmptyResource(), + Releasing: EmptyResource(), + Idle: NewResource(node.Status.Allocatable), + Used: EmptyResource(), - Allocatable: NewResource(node.Status.Allocatable), - Capability: NewResource(node.Status.Capacity), + Allocatable: NewResource(node.Status.Allocatable), + Capability: NewResource(node.Status.Capacity), - Tasks: make(map[TaskID]*TaskInfo), + Tasks: make(map[TaskID]*TaskInfo), + } } + + ni.setNodeState(node) + + return ni } // Clone used to clone nodeInfo Object @@ -85,8 +102,47 @@ func (ni *NodeInfo) Clone() *NodeInfo { return res } +// Ready returns whether node is ready for scheduling +func (ni *NodeInfo) Ready() bool { + return ni.State.Phase == Ready +} + +func (ni *NodeInfo) setNodeState(node *v1.Node) { + // If node is nil, the node is un-initialized in cache + if node == nil { + ni.State = NodeState{ + Phase: NotReady, + Reason: "UnInitialized", + } + return + } + + // set NodeState according to resources + if !ni.Used.LessEqual(NewResource(node.Status.Allocatable)) { + ni.State = NodeState{ + Phase: NotReady, + Reason: "OutOfSync", + } + return + } + + // Node is ready (ignore node conditions because of taint/toleration) + ni.State = NodeState{ + Phase: Ready, + Reason: "", + } +} + // SetNode sets kubernetes node object to nodeInfo object func (ni *NodeInfo) SetNode(node *v1.Node) { + ni.setNodeState(node) + + if !ni.Ready() { + glog.Warningf("Failed to set node info, phase: %s, reason: %s", + ni.State.Phase, ni.State.Reason) + return + } + ni.Name = node.Name ni.Node = node @@ -176,16 +232,16 @@ func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error { // String returns nodeInfo details in string format func (ni NodeInfo) String() string { - res := "" + tasks := "" i := 0 for _, task := range ni.Tasks { - res = res + fmt.Sprintf("\n\t %d: %v", i, task) + tasks = tasks + fmt.Sprintf("\n\t %d: %v", i, task) i++ } - return fmt.Sprintf("Node (%s): idle <%v>, used <%v>, releasing <%v>, taints <%v>%s", - ni.Name, ni.Idle, ni.Used, ni.Releasing, ni.Node.Spec.Taints, res) + return fmt.Sprintf("Node (%s): idle <%v>, used <%v>, releasing <%v>, state , taints <%v>%s", + ni.Name, ni.Idle, ni.Used, ni.Releasing, ni.State.Phase, ni.State.Reason, ni.Node.Spec.Taints, tasks) } diff --git a/pkg/scheduler/api/node_info_test.go b/pkg/scheduler/api/node_info_test.go index 49515cc26..ae08d4520 100644 --- a/pkg/scheduler/api/node_info_test.go +++ b/pkg/scheduler/api/node_info_test.go @@ -56,6 +56,7 @@ func TestNodeInfo_AddPod(t *testing.T) { Releasing: EmptyResource(), Allocatable: buildResource("8000m", "10G"), Capability: buildResource("8000m", "10G"), + State: NodeState{Phase: Ready}, Tasks: map[TaskID]*TaskInfo{ "c1/p1": NewTaskInfo(case01Pod1), "c1/p2": NewTaskInfo(case01Pod2), @@ -106,6 +107,7 @@ func TestNodeInfo_RemovePod(t *testing.T) { Releasing: EmptyResource(), Allocatable: buildResource("8000m", "10G"), Capability: buildResource("8000m", "10G"), + State: NodeState{Phase: Ready}, Tasks: map[TaskID]*TaskInfo{ "c1/p1": NewTaskInfo(case01Pod1), "c1/p3": NewTaskInfo(case01Pod3), diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 97a583df6..a88439e45 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -78,6 +78,27 @@ func (ts TaskStatus) String() string { } } +// NodePhase defines the phase of node +type NodePhase int + +const ( + // Ready means the node is ready for scheduling + Ready NodePhase = 1 << iota + // NotReady means the node is not ready for scheduling + NotReady +) + +func (np NodePhase) String() string { + switch np { + case Ready: + return "Ready" + case NotReady: + return "NotReady" + } + + return "Unknown" +} + // validateStatusUpdate validates whether the status transfer is valid. func validateStatusUpdate(oldStatus, newStatus TaskStatus) error { return nil diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 96dcc3c37..322d3b4d0 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -549,6 +549,10 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { } for _, value := range sc.Nodes { + if !value.Ready() { + continue + } + snapshot.Nodes[value.Name] = value.Clone() }