Skip to content

Commit

Permalink
test: migrate networkaware testing to ctrl runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
zwpaper committed Oct 31, 2023
1 parent 9096af8 commit a4ecf1a
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 214 deletions.
66 changes: 32 additions & 34 deletions pkg/networkaware/networkoverhead/networkoverhead.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

Expand All @@ -35,8 +36,8 @@ import (
pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config"
networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util"

appgroupv1a1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1"
nettopov1a1 "github.com/diktyo-io/networktopology-api/pkg/apis/networktopology/v1alpha1"
agv1alpha1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1"
ntv1alpha1 "github.com/diktyo-io/networktopology-api/pkg/apis/networktopology/v1alpha1"
)

var _ framework.PreFilterPlugin = &NetworkOverhead{}
Expand Down Expand Up @@ -64,6 +65,7 @@ const (
type NetworkOverhead struct {
client.Client

podLister corelisters.PodLister
handle framework.Handle
namespaces []string
weightsName string
Expand All @@ -79,13 +81,13 @@ type PreFilterState struct {
agName string

// AppGroup CR
appGroup *appgroupv1a1.AppGroup
appGroup *agv1alpha1.AppGroup

// NetworkTopology CR
networkTopology *nettopov1a1.NetworkTopology
networkTopology *ntv1alpha1.NetworkTopology

// Dependency List of the given pod
dependencyList []appgroupv1a1.DependenciesInfo
dependencyList []agv1alpha1.DependenciesInfo

// Pods already scheduled based on the dependency list
scheduledList networkawareutil.ScheduledList
Expand Down Expand Up @@ -140,8 +142,8 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)

utilruntime.Must(clientgoscheme.AddToScheme(scheme))

utilruntime.Must(appgroupv1a1.AddToScheme(scheme))
utilruntime.Must(nettopov1a1.AddToScheme(scheme))
utilruntime.Must(agv1alpha1.AddToScheme(scheme))
utilruntime.Must(ntv1alpha1.AddToScheme(scheme))

client, err := client.New(handle.KubeConfig(), client.Options{
Scheme: scheme,
Expand All @@ -151,7 +153,9 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
}

no := &NetworkOverhead{
Client: client,
Client: client,

podLister: handle.SharedInformerFactory().Core().V1().Pods().Lister(),
handle: handle,
namespaces: args.Namespaces,
weightsName: args.WeightsName,
Expand Down Expand Up @@ -199,28 +203,22 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle
return nil, framework.NewStatus(framework.Success, "Pod has no dependencies, return")
}

podList := &corev1.PodList{}
if err := no.List(ctx, podList,
client.MatchingLabelsSelector{
Selector: labels.Set(map[string]string{
appgroupv1a1.AppGroupLabel: agName,
}).AsSelector(),
}); err != nil {
klog.ErrorS(err, "List pods for group failed")
return nil, framework.NewStatus(
framework.Success, "Error while returning pods from appGroup, return")
// Get pods from lister
selector := labels.Set(map[string]string{agv1alpha1.AppGroupLabel: agName}).AsSelector()
pods, err := no.podLister.List(selector)
if err != nil {
return nil, framework.NewStatus(framework.Success, "Error while returning pods from appGroup, return")
}
pods := podList.Items

// Return if pods are not yet allocated for the AppGroup...
if pods == nil || len(pods) == 0 {
if len(pods) == 0 {
return nil, framework.NewStatus(framework.Success, "No pods yet allocated, return")
}

// Pods already scheduled: Get Scheduled List (Deployment name, replicaID, hostname)
scheduledList := networkawareutil.GetScheduledList(pods)
// Check if scheduledList is empty...
if scheduledList == nil || len(scheduledList) == 0 {
if len(scheduledList) == 0 {
klog.ErrorS(nil, "Scheduled list is empty, return")
return nil, framework.NewStatus(framework.Success, "Scheduled list is empty, return")
}
Expand Down Expand Up @@ -432,8 +430,8 @@ func getMinMaxScores(scores framework.NodeScoreList) (int64, int64) {
}

// sortNetworkTopologyCosts : sort costs if manual weights were selected
func (no *NetworkOverhead) sortNetworkTopologyCosts(networkTopology *nettopov1a1.NetworkTopology) {
if no.weightsName != nettopov1a1.NetworkTopologyNetperfCosts { // Manual weights were selected
func (no *NetworkOverhead) sortNetworkTopologyCosts(networkTopology *ntv1alpha1.NetworkTopology) {
if no.weightsName != ntv1alpha1.NetworkTopologyNetperfCosts { // Manual weights were selected
for _, w := range networkTopology.Spec.Weights {
// Sort Costs by TopologyKey, might not be sorted since were manually defined
sort.Sort(networkawareutil.ByTopologyKey(w.TopologyList))
Expand All @@ -444,7 +442,7 @@ func (no *NetworkOverhead) sortNetworkTopologyCosts(networkTopology *nettopov1a1
// populateCostMap : Populates costMap based on the node being filtered/scored
func (no *NetworkOverhead) populateCostMap(
costMap map[networkawareutil.CostKey]int64,
networkTopology *nettopov1a1.NetworkTopology,
networkTopology *ntv1alpha1.NetworkTopology,
region string,
zone string) {
for _, w := range networkTopology.Spec.Weights { // Check the weights List
Expand All @@ -454,9 +452,9 @@ func (no *NetworkOverhead) populateCostMap(

if region != "" { // Add Region Costs
// Binary search through CostList: find the Topology Key for region
topologyList := networkawareutil.FindTopologyKey(w.TopologyList, nettopov1a1.NetworkTopologyRegion)
topologyList := networkawareutil.FindTopologyKey(w.TopologyList, ntv1alpha1.NetworkTopologyRegion)

if no.weightsName != nettopov1a1.NetworkTopologyNetperfCosts {
if no.weightsName != ntv1alpha1.NetworkTopologyNetperfCosts {
// Sort Costs by origin, might not be sorted since were manually defined
sort.Sort(networkawareutil.ByOrigin(topologyList))
}
Expand All @@ -473,9 +471,9 @@ func (no *NetworkOverhead) populateCostMap(
}
if zone != "" { // Add Zone Costs
// Binary search through CostList: find the Topology Key for zone
topologyList := networkawareutil.FindTopologyKey(w.TopologyList, nettopov1a1.NetworkTopologyZone)
topologyList := networkawareutil.FindTopologyKey(w.TopologyList, ntv1alpha1.NetworkTopologyZone)

if no.weightsName != nettopov1a1.NetworkTopologyNetperfCosts {
if no.weightsName != ntv1alpha1.NetworkTopologyNetperfCosts {
// Sort Costs by origin, might not be sorted since were manually defined
sort.Sort(networkawareutil.ByOrigin(topologyList))
}
Expand All @@ -496,7 +494,7 @@ func (no *NetworkOverhead) populateCostMap(
// checkMaxNetworkCostRequirements : verifies the number of met and unmet dependencies based on the pod being filtered
func checkMaxNetworkCostRequirements(
scheduledList networkawareutil.ScheduledList,
dependencyList []appgroupv1a1.DependenciesInfo,
dependencyList []agv1alpha1.DependenciesInfo,
nodeInfo *framework.NodeInfo,
region string,
zone string,
Expand Down Expand Up @@ -571,7 +569,7 @@ func checkMaxNetworkCostRequirements(
// getAccumulatedCost : calculate the accumulated cost based on the Pod's dependencies
func (no *NetworkOverhead) getAccumulatedCost(
scheduledList networkawareutil.ScheduledList,
dependencyList []appgroupv1a1.DependenciesInfo,
dependencyList []agv1alpha1.DependenciesInfo,
nodeName string,
region string,
zone string,
Expand Down Expand Up @@ -647,12 +645,12 @@ func getPreFilterState(cycleState *framework.CycleState) (*PreFilterState, error
return state, nil
}

func (no *NetworkOverhead) findAppGroupNetworkOverhead(agName string) *appgroupv1a1.AppGroup {
func (no *NetworkOverhead) findAppGroupNetworkOverhead(agName string) *agv1alpha1.AppGroup {
klog.V(6).InfoS("namespaces: %s", no.namespaces)
for _, namespace := range no.namespaces {
klog.V(6).InfoS("appGroup CR", "namespace", namespace, "name", agName)
// AppGroup could not be placed in several namespaces simultaneously
appGroup := &appgroupv1a1.AppGroup{}
appGroup := &agv1alpha1.AppGroup{}
err := no.Get(context.TODO(), client.ObjectKey{
Namespace: namespace,
Name: agName,
Expand All @@ -668,12 +666,12 @@ func (no *NetworkOverhead) findAppGroupNetworkOverhead(agName string) *appgroupv
return nil
}

func (no *NetworkOverhead) findNetworkTopologyNetworkOverhead() *nettopov1a1.NetworkTopology {
func (no *NetworkOverhead) findNetworkTopologyNetworkOverhead() *ntv1alpha1.NetworkTopology {
klog.V(6).InfoS("namespaces: %s", no.namespaces)
for _, namespace := range no.namespaces {
klog.V(6).InfoS("networkTopology CR:", "namespace", namespace, "name", no.ntName)
// NetworkTopology could not be placed in several namespaces simultaneously
networkTopology := &nettopov1a1.NetworkTopology{}
networkTopology := &ntv1alpha1.NetworkTopology{}
err := no.Get(context.TODO(), client.ObjectKey{
Namespace: namespace,
Name: no.ntName,
Expand Down
Loading

0 comments on commit a4ecf1a

Please sign in to comment.