From 9096af883ab2e9361e2960d85e50462ea7c8957a Mon Sep 17 00:00:00 2001 From: Wei Zhang Date: Fri, 24 Feb 2023 00:24:54 +0800 Subject: [PATCH] refactor: migrate appgroup and net topo client to ctrl runtime Signed-off-by: Wei Zhang --- .../networkoverhead/networkoverhead.go | 166 +++++++++++------- .../networkoverhead/networkoverhead_test.go | 39 ++-- .../topologicalsort/topologicalsort.go | 35 ++-- pkg/networkaware/util/util.go | 61 +------ 4 files changed, 150 insertions(+), 151 deletions(-) diff --git a/pkg/networkaware/networkoverhead/networkoverhead.go b/pkg/networkaware/networkoverhead/networkoverhead.go index f4fce9355..494bd60c8 100644 --- a/pkg/networkaware/networkoverhead/networkoverhead.go +++ b/pkg/networkaware/networkoverhead/networkoverhead.go @@ -22,20 +22,21 @@ import ( "math" "sort" - v1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - corelisters "k8s.io/client-go/listers/core/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "sigs.k8s.io/controller-runtime/pkg/client" + pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config" networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util" - agv1alpha1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1" - aglisters "github.com/diktyo-io/appgroup-api/pkg/generated/listers/appgroup/v1alpha1" - ntv1alpha1 "github.com/diktyo-io/networktopology-api/pkg/apis/networktopology/v1alpha1" - ntlisters "github.com/diktyo-io/networktopology-api/pkg/generated/listers/networktopology/v1alpha1" + appgroupv1a1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1" + nettopov1a1 "github.com/diktyo-io/networktopology-api/pkg/apis/networktopology/v1alpha1" ) var _ framework.PreFilterPlugin = &NetworkOverhead{} @@ -61,10 +62,9 @@ const ( // NetworkOverhead : Filter and Score nodes based on Pod's AppGroup requirements: MaxNetworkCosts requirements among Pods with dependencies type NetworkOverhead struct { + client.Client + handle framework.Handle - podLister corelisters.PodLister - agLister aglisters.AppGroupLister - ntLister ntlisters.NetworkTopologyLister namespaces []string weightsName string ntName string @@ -79,13 +79,13 @@ type PreFilterState struct { agName string // AppGroup CR - appGroup *agv1alpha1.AppGroup + appGroup *appgroupv1a1.AppGroup // NetworkTopology CR - networkTopology *ntv1alpha1.NetworkTopology + networkTopology *nettopov1a1.NetworkTopology // Dependency List of the given pod - dependencyList []agv1alpha1.DependenciesInfo + dependencyList []appgroupv1a1.DependenciesInfo // Pods already scheduled based on the dependency list scheduledList networkawareutil.ScheduledList @@ -136,21 +136,23 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) return nil, err } - agLister, err := networkawareutil.InitAppGroupInformer(handle.KubeConfig()) - if err != nil { - return nil, err - } + scheme := runtime.NewScheme() + + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - ntLister, err := networkawareutil.InitNetworkTopologyInformer(handle.KubeConfig()) + utilruntime.Must(appgroupv1a1.AddToScheme(scheme)) + utilruntime.Must(nettopov1a1.AddToScheme(scheme)) + + client, err := client.New(handle.KubeConfig(), client.Options{ + Scheme: scheme, + }) if err != nil { return nil, err } no := &NetworkOverhead{ + Client: client, handle: handle, - podLister: handle.SharedInformerFactory().Core().V1().Pods().Lister(), - agLister: agLister, - ntLister: ntLister, namespaces: args.Namespaces, weightsName: args.WeightsName, ntName: args.NetworkTopologyName, @@ -165,7 +167,7 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) // 4. Update cost map of all nodes // 5. Get number of satisfied and violated dependencies // 6. Get final cost of the given node to be used in the score plugin -func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { +func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) (*framework.PreFilterResult, *framework.Status) { // Init PreFilter State preFilterState := &PreFilterState{ scoreEqually: true, @@ -197,24 +199,29 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle return nil, framework.NewStatus(framework.Success, "Pod has no dependencies, return") } - // Get pods from lister - selector := labels.Set(map[string]string{agv1alpha1.AppGroupLabel: agName}).AsSelector() - pods, err := no.podLister.List(selector) - if err != nil { - return nil, framework.NewStatus(framework.Success, "Error while returning pods from appGroup, return") + podList := &corev1.PodList{} + if err := no.List(ctx, podList, + client.MatchingLabelsSelector{ + Selector: labels.Set(map[string]string{ + appgroupv1a1.AppGroupLabel: agName, + }).AsSelector(), + }); err != nil { + klog.ErrorS(err, "List pods for group failed") + return nil, framework.NewStatus( + framework.Success, "Error while returning pods from appGroup, return") } + pods := podList.Items // Return if pods are not yet allocated for the AppGroup... - if pods == nil { + if pods == nil || len(pods) == 0 { return nil, framework.NewStatus(framework.Success, "No pods yet allocated, return") } // Pods already scheduled: Get Scheduled List (Deployment name, replicaID, hostname) scheduledList := networkawareutil.GetScheduledList(pods) - // Check if scheduledList is empty... - if scheduledList == nil { - klog.ErrorS(err, "Scheduled list is empty, return") + if scheduledList == nil || len(scheduledList) == 0 { + klog.ErrorS(nil, "Scheduled list is empty, return") return nil, framework.NewStatus(framework.Success, "Scheduled list is empty, return") } @@ -238,7 +245,10 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle // retrieve region and zone labels region := networkawareutil.GetNodeRegion(nodeInfo.Node()) zone := networkawareutil.GetNodeZone(nodeInfo.Node()) - klog.V(6).InfoS("Node info", "name", nodeInfo.Node().Name, "region", region, "zone", zone) + klog.V(6).InfoS("Node info", + "name", nodeInfo.Node().Name, + "region", region, + "zone", zone) // Create map for cost / destinations. Search for requirements faster... costMap := make(map[networkawareutil.CostKey]int64) @@ -295,18 +305,29 @@ func (no *NetworkOverhead) PreFilterExtensions() framework.PreFilterExtensions { // AddPod from pre-computed data in cycleState. // no current need for the NetworkOverhead plugin -func (no *NetworkOverhead) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { +func (no *NetworkOverhead) AddPod(ctx context.Context, + cycleState *framework.CycleState, + podToSchedule *corev1.Pod, + podToAdd *framework.PodInfo, + nodeInfo *framework.NodeInfo) *framework.Status { return framework.NewStatus(framework.Success, "") } // RemovePod from pre-computed data in cycleState. // no current need for the NetworkOverhead plugin -func (no *NetworkOverhead) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { +func (no *NetworkOverhead) RemovePod(ctx context.Context, + cycleState *framework.CycleState, + podToSchedule *corev1.Pod, + podToRemove *framework.PodInfo, + nodeInfo *framework.NodeInfo) *framework.Status { return framework.NewStatus(framework.Success, "") } // Filter : evaluate if node can respect maxNetworkCost requirements -func (no *NetworkOverhead) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { +func (no *NetworkOverhead) Filter(ctx context.Context, + cycleState *framework.CycleState, + pod *corev1.Pod, + nodeInfo *framework.NodeInfo) *framework.Status { if nodeInfo.Node() == nil { return framework.NewStatus(framework.Error, "node not found") } @@ -338,7 +359,10 @@ func (no *NetworkOverhead) Filter(ctx context.Context, cycleState *framework.Cyc } // Score : evaluate score for a node -func (no *NetworkOverhead) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { +func (no *NetworkOverhead) Score(ctx context.Context, + cycleState *framework.CycleState, + pod *corev1.Pod, + nodeName string) (int64, *framework.Status) { score := framework.MinNodeScore // Get PreFilterState @@ -360,7 +384,10 @@ func (no *NetworkOverhead) Score(ctx context.Context, cycleState *framework.Cycl } // NormalizeScore : normalize scores since lower scores correspond to lower latency -func (no *NetworkOverhead) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { +func (no *NetworkOverhead) NormalizeScore(ctx context.Context, + state *framework.CycleState, + pod *corev1.Pod, + scores framework.NodeScoreList) *framework.Status { klog.V(4).InfoS("before normalization: ", "scores", scores) // Get Min and Max Scores to normalize between framework.MaxNodeScore and framework.MinNodeScore @@ -405,8 +432,8 @@ func getMinMaxScores(scores framework.NodeScoreList) (int64, int64) { } // sortNetworkTopologyCosts : sort costs if manual weights were selected -func (no *NetworkOverhead) sortNetworkTopologyCosts(networkTopology *ntv1alpha1.NetworkTopology) { - if no.weightsName != ntv1alpha1.NetworkTopologyNetperfCosts { // Manual weights were selected +func (no *NetworkOverhead) sortNetworkTopologyCosts(networkTopology *nettopov1a1.NetworkTopology) { + if no.weightsName != nettopov1a1.NetworkTopologyNetperfCosts { // Manual weights were selected for _, w := range networkTopology.Spec.Weights { // Sort Costs by TopologyKey, might not be sorted since were manually defined sort.Sort(networkawareutil.ByTopologyKey(w.TopologyList)) @@ -415,7 +442,11 @@ func (no *NetworkOverhead) sortNetworkTopologyCosts(networkTopology *ntv1alpha1. } // populateCostMap : Populates costMap based on the node being filtered/scored -func (no *NetworkOverhead) populateCostMap(costMap map[networkawareutil.CostKey]int64, networkTopology *ntv1alpha1.NetworkTopology, region string, zone string) { +func (no *NetworkOverhead) populateCostMap( + costMap map[networkawareutil.CostKey]int64, + networkTopology *nettopov1a1.NetworkTopology, + region string, + zone string) { for _, w := range networkTopology.Spec.Weights { // Check the weights List if w.Name != no.weightsName { // If it is not the Preferred algorithm, continue continue @@ -423,9 +454,9 @@ func (no *NetworkOverhead) populateCostMap(costMap map[networkawareutil.CostKey] if region != "" { // Add Region Costs // Binary search through CostList: find the Topology Key for region - topologyList := networkawareutil.FindTopologyKey(w.TopologyList, ntv1alpha1.NetworkTopologyRegion) + topologyList := networkawareutil.FindTopologyKey(w.TopologyList, nettopov1a1.NetworkTopologyRegion) - if no.weightsName != ntv1alpha1.NetworkTopologyNetperfCosts { + if no.weightsName != nettopov1a1.NetworkTopologyNetperfCosts { // Sort Costs by origin, might not be sorted since were manually defined sort.Sort(networkawareutil.ByOrigin(topologyList)) } @@ -442,9 +473,9 @@ func (no *NetworkOverhead) populateCostMap(costMap map[networkawareutil.CostKey] } if zone != "" { // Add Zone Costs // Binary search through CostList: find the Topology Key for zone - topologyList := networkawareutil.FindTopologyKey(w.TopologyList, ntv1alpha1.NetworkTopologyZone) + topologyList := networkawareutil.FindTopologyKey(w.TopologyList, nettopov1a1.NetworkTopologyZone) - if no.weightsName != ntv1alpha1.NetworkTopologyNetperfCosts { + if no.weightsName != nettopov1a1.NetworkTopologyNetperfCosts { // Sort Costs by origin, might not be sorted since were manually defined sort.Sort(networkawareutil.ByOrigin(topologyList)) } @@ -463,9 +494,14 @@ func (no *NetworkOverhead) populateCostMap(costMap map[networkawareutil.CostKey] } // checkMaxNetworkCostRequirements : verifies the number of met and unmet dependencies based on the pod being filtered -func checkMaxNetworkCostRequirements(scheduledList networkawareutil.ScheduledList, dependencyList []agv1alpha1.DependenciesInfo, nodeInfo *framework.NodeInfo, region string, - zone string, costMap map[networkawareutil.CostKey]int64, no *NetworkOverhead) (int64, int64, error) { - +func checkMaxNetworkCostRequirements( + scheduledList networkawareutil.ScheduledList, + dependencyList []appgroupv1a1.DependenciesInfo, + nodeInfo *framework.NodeInfo, + region string, + zone string, + costMap map[networkawareutil.CostKey]int64, + no *NetworkOverhead) (int64, int64, error) { var satisfied int64 = 0 var violated int64 = 0 @@ -533,9 +569,13 @@ func checkMaxNetworkCostRequirements(scheduledList networkawareutil.ScheduledLis } // getAccumulatedCost : calculate the accumulated cost based on the Pod's dependencies -func (no *NetworkOverhead) getAccumulatedCost(scheduledList networkawareutil.ScheduledList, dependencyList []agv1alpha1.DependenciesInfo, nodeName string, region string, - zone string, costMap map[networkawareutil.CostKey]int64) (int64, error) { - +func (no *NetworkOverhead) getAccumulatedCost( + scheduledList networkawareutil.ScheduledList, + dependencyList []appgroupv1a1.DependenciesInfo, + nodeName string, + region string, + zone string, + costMap map[networkawareutil.CostKey]int64) (int64, error) { // keep track of the accumulated cost var cost int64 = 0 @@ -607,34 +647,42 @@ func getPreFilterState(cycleState *framework.CycleState) (*PreFilterState, error return state, nil } -func (no *NetworkOverhead) findAppGroupNetworkOverhead(agName string) *agv1alpha1.AppGroup { +func (no *NetworkOverhead) findAppGroupNetworkOverhead(agName string) *appgroupv1a1.AppGroup { klog.V(6).InfoS("namespaces: %s", no.namespaces) for _, namespace := range no.namespaces { - klog.V(6).InfoS("appGroup CR", "namespace", namespace, "ag.lister", no.agLister) + klog.V(6).InfoS("appGroup CR", "namespace", namespace, "name", agName) // AppGroup could not be placed in several namespaces simultaneously - appGroup, err := no.agLister.AppGroups(namespace).Get(agName) + appGroup := &appgroupv1a1.AppGroup{} + err := no.Get(context.TODO(), client.ObjectKey{ + Namespace: namespace, + Name: agName, + }, appGroup) if err != nil { - klog.V(4).InfoS("Cannot get AppGroup from AppGroupNamespaceLister:", "error", err) + klog.V(4).ErrorS(err, "Cannot get AppGroup from AppGroupNamespaceLister:") continue } - if appGroup != nil { + if appGroup != nil && appGroup.GetUID() != "" { return appGroup } } return nil } -func (no *NetworkOverhead) findNetworkTopologyNetworkOverhead() *ntv1alpha1.NetworkTopology { +func (no *NetworkOverhead) findNetworkTopologyNetworkOverhead() *nettopov1a1.NetworkTopology { klog.V(6).InfoS("namespaces: %s", no.namespaces) for _, namespace := range no.namespaces { - klog.V(6).InfoS("networkTopology CR:", "namespace", namespace, "nt.lister", no.ntLister) + klog.V(6).InfoS("networkTopology CR:", "namespace", namespace, "name", no.ntName) // NetworkTopology could not be placed in several namespaces simultaneously - networkTopology, err := no.ntLister.NetworkTopologies(namespace).Get(no.ntName) + networkTopology := &nettopov1a1.NetworkTopology{} + err := no.Get(context.TODO(), client.ObjectKey{ + Namespace: namespace, + Name: no.ntName, + }, networkTopology) if err != nil { - klog.V(4).InfoS("Cannot get networkTopology from networkTopologyNamespaceLister:", "error", err) + klog.V(4).ErrorS(err, "Cannot get networkTopology from networkTopologyNamespaceLister:") continue } - if networkTopology != nil { + if networkTopology != nil && networkTopology.GetUID() != "" { return networkTopology } } diff --git a/pkg/networkaware/networkoverhead/networkoverhead_test.go b/pkg/networkaware/networkoverhead/networkoverhead_test.go index 67266da32..7eb50d0b7 100644 --- a/pkg/networkaware/networkoverhead/networkoverhead_test.go +++ b/pkg/networkaware/networkoverhead/networkoverhead_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "k8s.io/client-go/kubernetes/scheme" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -37,6 +38,9 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + agv1alpha1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1" agfake "github.com/diktyo-io/appgroup-api/pkg/generated/clientset/versioned/fake" aginformers "github.com/diktyo-io/appgroup-api/pkg/generated/informers/externalversions" @@ -479,41 +483,27 @@ func BenchmarkNetworkOverheadPreFilter(b *testing.B) { for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - // init listers - agClient := agfake.NewSimpleClientset() - ntClient := ntfake.NewSimpleClientset() - - fakeAgInformer := aginformers.NewSharedInformerFactory(agClient, 0).Appgroup().V1alpha1().AppGroups() - fakeNTInformer := ntinformers.NewSharedInformerFactory(ntClient, 0).Networktopology().V1alpha1().NetworkTopologies() + s := scheme.Scheme // init nodes nodes := getNodes(tt.nodesNum, tt.regionNames, tt.zoneNames) + s.AddKnownTypes(v1.SchemeGroupVersion, nodes[0], tt.pods[0]) // Create dependencies tt.appGroup.Status.RunningWorkloads = tt.dependenciesNum + s.AddKnownTypes(agv1alpha1.SchemeGroupVersion, tt.appGroup) + s.AddKnownTypes(ntv1alpha1.SchemeGroupVersion, tt.networkTopology) - // add CRDs - agLister := fakeAgInformer.Lister() - ntLister := fakeNTInformer.Lister() - - _ = fakeAgInformer.Informer().GetStore().Add(tt.appGroup) - _ = fakeNTInformer.Informer().GetStore().Add(tt.networkTopology) + client := fake.NewClientBuilder(). + WithScheme(s). + WithRuntimeObjects(tt.appGroup, tt.networkTopology). + Build() // create plugin ctx := context.Background() - cs := testClientSet.NewSimpleClientset() - - informerFactory := informers.NewSharedInformerFactory(cs, 0) - - snapshot := newTestSharedLister(nil, nodes) - - podInformer := informerFactory.Core().V1().Pods() - podLister := podInformer.Lister() - informerFactory.Start(ctx.Done()) for _, p := range tt.pods { - _, err := cs.CoreV1().Pods("default").Create(ctx, p, metav1.CreateOptions{}) - if err != nil { + if err := client.Create(ctx, p); err != nil { b.Fatalf("Failed to create Workload %q: %v", p.Name, err) } } @@ -523,7 +513,8 @@ func BenchmarkNetworkOverheadPreFilter(b *testing.B) { st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), } - fh, _ := st.NewFramework(registeredPlugins, "default-scheduler", ctx.Done(), runtime.WithClientSet(cs), + snapshot := newTestSharedLister(nil, nodes) + fh, _ := st.NewFramework(registeredPlugins, "default-scheduler", ctx.Done(), runtime.WithClientSet(cs), runtime.WithInformerFactory(informerFactory), runtime.WithSnapshotSharedLister(snapshot)) pl := &NetworkOverhead{ diff --git a/pkg/networkaware/topologicalsort/topologicalsort.go b/pkg/networkaware/topologicalsort/topologicalsort.go index 30e22569b..6cf2b973d 100644 --- a/pkg/networkaware/topologicalsort/topologicalsort.go +++ b/pkg/networkaware/topologicalsort/topologicalsort.go @@ -17,18 +17,22 @@ limitations under the License. package topologicalsort import ( + "context" "fmt" "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + "sigs.k8s.io/controller-runtime/pkg/client" + pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config" networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util" - agv1alpha "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1" - aglisters "github.com/diktyo-io/appgroup-api/pkg/generated/listers/appgroup/v1alpha1" + appgroupv1a1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1" ) const ( @@ -38,8 +42,8 @@ const ( // TopologicalSort : Sort pods based on their AppGroup and corresponding microservice dependencies type TopologicalSort struct { + client.Client handle framework.Handle - agLister aglisters.AppGroupLister namespaces []string } @@ -69,14 +73,21 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) return nil, err } - agLister, err := networkawareutil.InitAppGroupInformer(handle.KubeConfig()) + scheme := runtime.NewScheme() + + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(appgroupv1a1.AddToScheme(scheme)) + + client, err := client.New(handle.KubeConfig(), client.Options{ + Scheme: scheme, + }) if err != nil { return nil, err } pl := &TopologicalSort{ + Client: client, handle: handle, - agLister: agLister, namespaces: args.Namespaces, } return pl, nil @@ -106,8 +117,8 @@ func (ts *TopologicalSort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { labelsP2 := pInfo2.Pod.GetLabels() // Binary search to find both order index since topology list is ordered by Workload Name - orderP1 := networkawareutil.FindPodOrder(appGroup.Status.TopologyOrder, labelsP1[agv1alpha.AppGroupSelectorLabel]) - orderP2 := networkawareutil.FindPodOrder(appGroup.Status.TopologyOrder, labelsP2[agv1alpha.AppGroupSelectorLabel]) + orderP1 := networkawareutil.FindPodOrder(appGroup.Status.TopologyOrder, labelsP1[appgroupv1a1.AppGroupSelectorLabel]) + orderP2 := networkawareutil.FindPodOrder(appGroup.Status.TopologyOrder, labelsP2[appgroupv1a1.AppGroupSelectorLabel]) klog.V(6).InfoS("Pod order values", "p1 order", orderP1, "p2 order", orderP2) @@ -115,12 +126,16 @@ func (ts *TopologicalSort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { return orderP1 <= orderP2 } -func (ts *TopologicalSort) findAppGroupTopologicalSort(agName string) *agv1alpha.AppGroup { +func (ts *TopologicalSort) findAppGroupTopologicalSort(agName string) *appgroupv1a1.AppGroup { klog.V(6).InfoS("namespaces: %s", ts.namespaces) for _, namespace := range ts.namespaces { - klog.V(6).InfoS("appGroup CR", "namespace", namespace, "ag.lister", ts.agLister) + klog.V(6).InfoS("appGroup CR", "namespace", namespace, "name", agName) // AppGroup couldn't be placed in several namespaces simultaneously - appGroup, err := ts.agLister.AppGroups(namespace).Get(agName) + appGroup := &appgroupv1a1.AppGroup{} + err := ts.Get(context.TODO(), client.ObjectKey{ + Namespace: namespace, + Name: agName, + }, appGroup) if err != nil { klog.V(4).InfoS("Cannot get AppGroup from AppGroupNamespaceLister:", "error", err) continue diff --git a/pkg/networkaware/util/util.go b/pkg/networkaware/util/util.go index 729cbfca8..d4e9f0642 100644 --- a/pkg/networkaware/util/util.go +++ b/pkg/networkaware/util/util.go @@ -17,20 +17,10 @@ limitations under the License. package util import ( - "context" - v1 "k8s.io/api/core/v1" - restclient "k8s.io/client-go/rest" - "k8s.io/klog/v2" agv1alpha1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1" - agclientset "github.com/diktyo-io/appgroup-api/pkg/generated/clientset/versioned" - aginformers "github.com/diktyo-io/appgroup-api/pkg/generated/informers/externalversions" - agListers "github.com/diktyo-io/appgroup-api/pkg/generated/listers/appgroup/v1alpha1" ntv1alpha1 "github.com/diktyo-io/networktopology-api/pkg/apis/networktopology/v1alpha1" - ntclientset "github.com/diktyo-io/networktopology-api/pkg/generated/clientset/versioned" - ntinformers "github.com/diktyo-io/networktopology-api/pkg/generated/informers/externalversions" - ntListers "github.com/diktyo-io/networktopology-api/pkg/generated/listers/networktopology/v1alpha1" ) // CostKey : key for map concerning network costs (origin / destinations) @@ -200,51 +190,6 @@ func FindTopologyKey(topologyList []ntv1alpha1.TopologyInfo, key ntv1alpha1.Topo return ntv1alpha1.OriginList{} } -// AssignedPod : selects pods that are assigned (scheduled and running). -func AssignedPod(pod *v1.Pod) bool { - return len(pod.Spec.NodeName) != 0 -} - -// InitAppGroupInformer : starts AppGroup informer -func InitAppGroupInformer(kubeConfig *restclient.Config) (agListers.AppGroupLister, error) { - agClient, err := agclientset.NewForConfig(kubeConfig) - if err != nil { - klog.Errorf("Cannot create clientset for AppGroup Informer: %s, %s", kubeConfig, err) - return nil, err - } - - agInformerFactory := aginformers.NewSharedInformerFactory(agClient, 0) - agInformer := agInformerFactory.Appgroup().V1alpha1().AppGroups() - appGroupLister := agInformer.Lister() - - klog.V(5).InfoS("start appGroupInformer") - ctx := context.Background() - agInformerFactory.Start(ctx.Done()) - agInformerFactory.WaitForCacheSync(ctx.Done()) - - return appGroupLister, nil -} - -// InitNetworkTopologyInformer : starts NetworkTopology informer -func InitNetworkTopologyInformer(kubeConfig *restclient.Config) (ntListers.NetworkTopologyLister, error) { - ntClient, err := ntclientset.NewForConfig(kubeConfig) - if err != nil { - klog.Errorf("Cannot create clientset for NetworkTopology Informer: %s, %s", kubeConfig, err) - return nil, err - } - - ntInformerFactory := ntinformers.NewSharedInformerFactory(ntClient, 0) - ntInformer := ntInformerFactory.Networktopology().V1alpha1().NetworkTopologies() - appGroupLister := ntInformer.Lister() - - klog.V(5).InfoS("start networkTopology Informer") - ctx := context.Background() - ntInformerFactory.Start(ctx.Done()) - ntInformerFactory.WaitForCacheSync(ctx.Done()) - - return appGroupLister, nil -} - // GetDependencyList : get workload dependencies established in the AppGroup CR func GetDependencyList(pod *v1.Pod, ag *agv1alpha1.AppGroup) []agv1alpha1.DependenciesInfo { @@ -267,15 +212,15 @@ func GetDependencyList(pod *v1.Pod, ag *agv1alpha1.AppGroup) []agv1alpha1.Depend } // GetScheduledList : get Pods already scheduled in the cluster for that specific AppGroup -func GetScheduledList(pods []*v1.Pod) ScheduledList { +func GetScheduledList(pods []v1.Pod) ScheduledList { // scheduledList: Deployment name, replicaID, hostname scheduledList := ScheduledList{} for _, p := range pods { - if AssignedPod(p) { + if len(p.Spec.NodeName) != 0 { scheduledInfo := ScheduledInfo{ Name: p.Name, - Selector: GetPodAppGroupSelector(p), + Selector: GetPodAppGroupSelector(&p), ReplicaID: string(p.GetUID()), Hostname: p.Spec.NodeName, }