Skip to content

Commit

Permalink
refactor: migrate appgroup and net topo client to ctrl runtime
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Zhang <kweizh@gmail.com>
  • Loading branch information
zwpaper committed Oct 31, 2023
1 parent 40c0fee commit 9096af8
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 151 deletions.
166 changes: 107 additions & 59 deletions pkg/networkaware/networkoverhead/networkoverhead.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@ import (
"math"
"sort"

v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
corelisters "k8s.io/client-go/listers/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

"sigs.k8s.io/controller-runtime/pkg/client"

pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config"
networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util"

agv1alpha1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1"
aglisters "github.com/diktyo-io/appgroup-api/pkg/generated/listers/appgroup/v1alpha1"
ntv1alpha1 "github.com/diktyo-io/networktopology-api/pkg/apis/networktopology/v1alpha1"
ntlisters "github.com/diktyo-io/networktopology-api/pkg/generated/listers/networktopology/v1alpha1"
appgroupv1a1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1"
nettopov1a1 "github.com/diktyo-io/networktopology-api/pkg/apis/networktopology/v1alpha1"
)

var _ framework.PreFilterPlugin = &NetworkOverhead{}
Expand All @@ -61,10 +62,9 @@ const (

// NetworkOverhead : Filter and Score nodes based on Pod's AppGroup requirements: MaxNetworkCosts requirements among Pods with dependencies
type NetworkOverhead struct {
client.Client

handle framework.Handle
podLister corelisters.PodLister
agLister aglisters.AppGroupLister
ntLister ntlisters.NetworkTopologyLister
namespaces []string
weightsName string
ntName string
Expand All @@ -79,13 +79,13 @@ type PreFilterState struct {
agName string

// AppGroup CR
appGroup *agv1alpha1.AppGroup
appGroup *appgroupv1a1.AppGroup

// NetworkTopology CR
networkTopology *ntv1alpha1.NetworkTopology
networkTopology *nettopov1a1.NetworkTopology

// Dependency List of the given pod
dependencyList []agv1alpha1.DependenciesInfo
dependencyList []appgroupv1a1.DependenciesInfo

// Pods already scheduled based on the dependency list
scheduledList networkawareutil.ScheduledList
Expand Down Expand Up @@ -136,21 +136,23 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
return nil, err
}

agLister, err := networkawareutil.InitAppGroupInformer(handle.KubeConfig())
if err != nil {
return nil, err
}
scheme := runtime.NewScheme()

utilruntime.Must(clientgoscheme.AddToScheme(scheme))

ntLister, err := networkawareutil.InitNetworkTopologyInformer(handle.KubeConfig())
utilruntime.Must(appgroupv1a1.AddToScheme(scheme))
utilruntime.Must(nettopov1a1.AddToScheme(scheme))

client, err := client.New(handle.KubeConfig(), client.Options{
Scheme: scheme,
})
if err != nil {
return nil, err
}

no := &NetworkOverhead{
Client: client,
handle: handle,
podLister: handle.SharedInformerFactory().Core().V1().Pods().Lister(),
agLister: agLister,
ntLister: ntLister,
namespaces: args.Namespaces,
weightsName: args.WeightsName,
ntName: args.NetworkTopologyName,
Expand All @@ -165,7 +167,7 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
// 4. Update cost map of all nodes
// 5. Get number of satisfied and violated dependencies
// 6. Get final cost of the given node to be used in the score plugin
func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) (*framework.PreFilterResult, *framework.Status) {
// Init PreFilter State
preFilterState := &PreFilterState{
scoreEqually: true,
Expand Down Expand Up @@ -197,24 +199,29 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle
return nil, framework.NewStatus(framework.Success, "Pod has no dependencies, return")
}

// Get pods from lister
selector := labels.Set(map[string]string{agv1alpha1.AppGroupLabel: agName}).AsSelector()
pods, err := no.podLister.List(selector)
if err != nil {
return nil, framework.NewStatus(framework.Success, "Error while returning pods from appGroup, return")
podList := &corev1.PodList{}
if err := no.List(ctx, podList,
client.MatchingLabelsSelector{
Selector: labels.Set(map[string]string{
appgroupv1a1.AppGroupLabel: agName,
}).AsSelector(),
}); err != nil {
klog.ErrorS(err, "List pods for group failed")
return nil, framework.NewStatus(
framework.Success, "Error while returning pods from appGroup, return")
}
pods := podList.Items

// Return if pods are not yet allocated for the AppGroup...
if pods == nil {
if pods == nil || len(pods) == 0 {
return nil, framework.NewStatus(framework.Success, "No pods yet allocated, return")
}

// Pods already scheduled: Get Scheduled List (Deployment name, replicaID, hostname)
scheduledList := networkawareutil.GetScheduledList(pods)

// Check if scheduledList is empty...
if scheduledList == nil {
klog.ErrorS(err, "Scheduled list is empty, return")
if scheduledList == nil || len(scheduledList) == 0 {
klog.ErrorS(nil, "Scheduled list is empty, return")
return nil, framework.NewStatus(framework.Success, "Scheduled list is empty, return")
}

Expand All @@ -238,7 +245,10 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle
// retrieve region and zone labels
region := networkawareutil.GetNodeRegion(nodeInfo.Node())
zone := networkawareutil.GetNodeZone(nodeInfo.Node())
klog.V(6).InfoS("Node info", "name", nodeInfo.Node().Name, "region", region, "zone", zone)
klog.V(6).InfoS("Node info",
"name", nodeInfo.Node().Name,
"region", region,
"zone", zone)

// Create map for cost / destinations. Search for requirements faster...
costMap := make(map[networkawareutil.CostKey]int64)
Expand Down Expand Up @@ -295,18 +305,29 @@ func (no *NetworkOverhead) PreFilterExtensions() framework.PreFilterExtensions {

// AddPod from pre-computed data in cycleState.
// no current need for the NetworkOverhead plugin
func (no *NetworkOverhead) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
func (no *NetworkOverhead) AddPod(ctx context.Context,
cycleState *framework.CycleState,
podToSchedule *corev1.Pod,
podToAdd *framework.PodInfo,
nodeInfo *framework.NodeInfo) *framework.Status {
return framework.NewStatus(framework.Success, "")
}

// RemovePod from pre-computed data in cycleState.
// no current need for the NetworkOverhead plugin
func (no *NetworkOverhead) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
func (no *NetworkOverhead) RemovePod(ctx context.Context,
cycleState *framework.CycleState,
podToSchedule *corev1.Pod,
podToRemove *framework.PodInfo,
nodeInfo *framework.NodeInfo) *framework.Status {
return framework.NewStatus(framework.Success, "")
}

// Filter : evaluate if node can respect maxNetworkCost requirements
func (no *NetworkOverhead) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
func (no *NetworkOverhead) Filter(ctx context.Context,
cycleState *framework.CycleState,
pod *corev1.Pod,
nodeInfo *framework.NodeInfo) *framework.Status {
if nodeInfo.Node() == nil {
return framework.NewStatus(framework.Error, "node not found")
}
Expand Down Expand Up @@ -338,7 +359,10 @@ func (no *NetworkOverhead) Filter(ctx context.Context, cycleState *framework.Cyc
}

// Score : evaluate score for a node
func (no *NetworkOverhead) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
func (no *NetworkOverhead) Score(ctx context.Context,
cycleState *framework.CycleState,
pod *corev1.Pod,
nodeName string) (int64, *framework.Status) {
score := framework.MinNodeScore

// Get PreFilterState
Expand All @@ -360,7 +384,10 @@ func (no *NetworkOverhead) Score(ctx context.Context, cycleState *framework.Cycl
}

// NormalizeScore : normalize scores since lower scores correspond to lower latency
func (no *NetworkOverhead) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
func (no *NetworkOverhead) NormalizeScore(ctx context.Context,
state *framework.CycleState,
pod *corev1.Pod,
scores framework.NodeScoreList) *framework.Status {
klog.V(4).InfoS("before normalization: ", "scores", scores)

// Get Min and Max Scores to normalize between framework.MaxNodeScore and framework.MinNodeScore
Expand Down Expand Up @@ -405,8 +432,8 @@ func getMinMaxScores(scores framework.NodeScoreList) (int64, int64) {
}

// sortNetworkTopologyCosts : sort costs if manual weights were selected
func (no *NetworkOverhead) sortNetworkTopologyCosts(networkTopology *ntv1alpha1.NetworkTopology) {
if no.weightsName != ntv1alpha1.NetworkTopologyNetperfCosts { // Manual weights were selected
func (no *NetworkOverhead) sortNetworkTopologyCosts(networkTopology *nettopov1a1.NetworkTopology) {
if no.weightsName != nettopov1a1.NetworkTopologyNetperfCosts { // Manual weights were selected
for _, w := range networkTopology.Spec.Weights {
// Sort Costs by TopologyKey, might not be sorted since were manually defined
sort.Sort(networkawareutil.ByTopologyKey(w.TopologyList))
Expand All @@ -415,17 +442,21 @@ func (no *NetworkOverhead) sortNetworkTopologyCosts(networkTopology *ntv1alpha1.
}

// populateCostMap : Populates costMap based on the node being filtered/scored
func (no *NetworkOverhead) populateCostMap(costMap map[networkawareutil.CostKey]int64, networkTopology *ntv1alpha1.NetworkTopology, region string, zone string) {
func (no *NetworkOverhead) populateCostMap(
costMap map[networkawareutil.CostKey]int64,
networkTopology *nettopov1a1.NetworkTopology,
region string,
zone string) {
for _, w := range networkTopology.Spec.Weights { // Check the weights List
if w.Name != no.weightsName { // If it is not the Preferred algorithm, continue
continue
}

if region != "" { // Add Region Costs
// Binary search through CostList: find the Topology Key for region
topologyList := networkawareutil.FindTopologyKey(w.TopologyList, ntv1alpha1.NetworkTopologyRegion)
topologyList := networkawareutil.FindTopologyKey(w.TopologyList, nettopov1a1.NetworkTopologyRegion)

if no.weightsName != ntv1alpha1.NetworkTopologyNetperfCosts {
if no.weightsName != nettopov1a1.NetworkTopologyNetperfCosts {
// Sort Costs by origin, might not be sorted since were manually defined
sort.Sort(networkawareutil.ByOrigin(topologyList))
}
Expand All @@ -442,9 +473,9 @@ func (no *NetworkOverhead) populateCostMap(costMap map[networkawareutil.CostKey]
}
if zone != "" { // Add Zone Costs
// Binary search through CostList: find the Topology Key for zone
topologyList := networkawareutil.FindTopologyKey(w.TopologyList, ntv1alpha1.NetworkTopologyZone)
topologyList := networkawareutil.FindTopologyKey(w.TopologyList, nettopov1a1.NetworkTopologyZone)

if no.weightsName != ntv1alpha1.NetworkTopologyNetperfCosts {
if no.weightsName != nettopov1a1.NetworkTopologyNetperfCosts {
// Sort Costs by origin, might not be sorted since were manually defined
sort.Sort(networkawareutil.ByOrigin(topologyList))
}
Expand All @@ -463,9 +494,14 @@ func (no *NetworkOverhead) populateCostMap(costMap map[networkawareutil.CostKey]
}

// checkMaxNetworkCostRequirements : verifies the number of met and unmet dependencies based on the pod being filtered
func checkMaxNetworkCostRequirements(scheduledList networkawareutil.ScheduledList, dependencyList []agv1alpha1.DependenciesInfo, nodeInfo *framework.NodeInfo, region string,
zone string, costMap map[networkawareutil.CostKey]int64, no *NetworkOverhead) (int64, int64, error) {

func checkMaxNetworkCostRequirements(
scheduledList networkawareutil.ScheduledList,
dependencyList []appgroupv1a1.DependenciesInfo,
nodeInfo *framework.NodeInfo,
region string,
zone string,
costMap map[networkawareutil.CostKey]int64,
no *NetworkOverhead) (int64, int64, error) {
var satisfied int64 = 0
var violated int64 = 0

Expand Down Expand Up @@ -533,9 +569,13 @@ func checkMaxNetworkCostRequirements(scheduledList networkawareutil.ScheduledLis
}

// getAccumulatedCost : calculate the accumulated cost based on the Pod's dependencies
func (no *NetworkOverhead) getAccumulatedCost(scheduledList networkawareutil.ScheduledList, dependencyList []agv1alpha1.DependenciesInfo, nodeName string, region string,
zone string, costMap map[networkawareutil.CostKey]int64) (int64, error) {

func (no *NetworkOverhead) getAccumulatedCost(
scheduledList networkawareutil.ScheduledList,
dependencyList []appgroupv1a1.DependenciesInfo,
nodeName string,
region string,
zone string,
costMap map[networkawareutil.CostKey]int64) (int64, error) {
// keep track of the accumulated cost
var cost int64 = 0

Expand Down Expand Up @@ -607,34 +647,42 @@ func getPreFilterState(cycleState *framework.CycleState) (*PreFilterState, error
return state, nil
}

func (no *NetworkOverhead) findAppGroupNetworkOverhead(agName string) *agv1alpha1.AppGroup {
func (no *NetworkOverhead) findAppGroupNetworkOverhead(agName string) *appgroupv1a1.AppGroup {
klog.V(6).InfoS("namespaces: %s", no.namespaces)
for _, namespace := range no.namespaces {
klog.V(6).InfoS("appGroup CR", "namespace", namespace, "ag.lister", no.agLister)
klog.V(6).InfoS("appGroup CR", "namespace", namespace, "name", agName)
// AppGroup could not be placed in several namespaces simultaneously
appGroup, err := no.agLister.AppGroups(namespace).Get(agName)
appGroup := &appgroupv1a1.AppGroup{}
err := no.Get(context.TODO(), client.ObjectKey{
Namespace: namespace,
Name: agName,
}, appGroup)
if err != nil {
klog.V(4).InfoS("Cannot get AppGroup from AppGroupNamespaceLister:", "error", err)
klog.V(4).ErrorS(err, "Cannot get AppGroup from AppGroupNamespaceLister:")
continue
}
if appGroup != nil {
if appGroup != nil && appGroup.GetUID() != "" {
return appGroup
}
}
return nil
}

func (no *NetworkOverhead) findNetworkTopologyNetworkOverhead() *ntv1alpha1.NetworkTopology {
func (no *NetworkOverhead) findNetworkTopologyNetworkOverhead() *nettopov1a1.NetworkTopology {
klog.V(6).InfoS("namespaces: %s", no.namespaces)
for _, namespace := range no.namespaces {
klog.V(6).InfoS("networkTopology CR:", "namespace", namespace, "nt.lister", no.ntLister)
klog.V(6).InfoS("networkTopology CR:", "namespace", namespace, "name", no.ntName)
// NetworkTopology could not be placed in several namespaces simultaneously
networkTopology, err := no.ntLister.NetworkTopologies(namespace).Get(no.ntName)
networkTopology := &nettopov1a1.NetworkTopology{}
err := no.Get(context.TODO(), client.ObjectKey{
Namespace: namespace,
Name: no.ntName,
}, networkTopology)
if err != nil {
klog.V(4).InfoS("Cannot get networkTopology from networkTopologyNamespaceLister:", "error", err)
klog.V(4).ErrorS(err, "Cannot get networkTopology from networkTopologyNamespaceLister:")
continue
}
if networkTopology != nil {
if networkTopology != nil && networkTopology.GetUID() != "" {
return networkTopology
}
}
Expand Down
Loading

0 comments on commit 9096af8

Please sign in to comment.