From e824f0fa05782600beca3924ec043a28eda0e121 Mon Sep 17 00:00:00 2001 From: wadecai Date: Tue, 29 Dec 2020 17:47:44 +0800 Subject: [PATCH 1/2] Split some funcs --- pkg/webhook/hook.go | 226 +++++++++++++++++++++++--------------------- 1 file changed, 120 insertions(+), 106 deletions(-) diff --git a/pkg/webhook/hook.go b/pkg/webhook/hook.go index 07cd350..8c03447 100644 --- a/pkg/webhook/hook.go +++ b/pkg/webhook/hook.go @@ -105,67 +105,31 @@ func (whsvr *webhookServer) mutate(ar *v1beta1.AdmissionReview) *v1beta1.Admissi Allowed: false, } } - if err != nil { - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: err.Error(), - }, - } - } - if req.Namespace == "kube-system" { + if shouldSkip(&pod) { return &v1beta1.AdmissionResponse{ Allowed: true, } } - if pod.Labels != nil { - if pod.Labels[util.CreatedbyDescheduler] == "true" { - return &v1beta1.AdmissionResponse{ - Allowed: true, - } - } - if !util.IsVirtualPod(&pod) { - return &v1beta1.AdmissionResponse{ - Allowed: true, - } - } - } - ref := "" - if len(pod.OwnerReferences) > 0 { - ref = string(pod.OwnerReferences[0].UID) - } + ref := getOwnerRef(&pod) clone := pod.DeepCopy() switch req.Operation { case v1beta1.Update: - node := "" - if len(ref) > 0 { - if pod.Annotations != nil { - node = pod.Annotations["unschedulable-node"] - } - if len(node) > 0 { - klog.Infof("Unschedulable nodes %+v ref %v to cache", node, ref) - freezeCache.Add(node, ref) - } - } + setUnschedulableNodes(ref, clone) return &v1beta1.AdmissionResponse{ Allowed: true, } case v1beta1.Create: - klog.Infof("Create pod %v node %v, owner %v", clone.Name, clone.Spec.NodeName, ref) - if len(ref) > 0 && len(clone.Spec.NodeName) == 0 { - nodes := freezeCache.GetFreezeNodes(ref) - klog.Infof("Not in nodes %v for %v", nodes, ref) - if len(nodes) > 0 { - klog.Infof("Create pod %v Not nodes %+v", clone.Name, nodes) - clone.Spec.Affinity, _ = util.ReplacePodNodeNameNodeAffinity(clone.Spec.Affinity, ref, 0, nil, nodes...) - } + nodes := getUnschedulableNodes(ref, clone) + if len(nodes) > 0 { + klog.Infof("Create pod %v Not nodes %+v", clone.Name, nodes) + clone.Spec.Affinity, _ = util.ReplacePodNodeNameNodeAffinity(clone.Spec.Affinity, ref, 0, nil, nodes...) } default: klog.Warningf("Skip operation: %v", req.Operation) } - whsvr.trySetNodeName(clone, req.Namespace) + whsvr.trySetNodeName(clone) inject(clone, whsvr.ignoreSelectorKeys) - klog.V(6).Infof("Final obj %+v", clone) patch, err := util.CreateJSONPatch(pod, clone) klog.Infof("Final patch %+v", string(patch)) var result metav1.Status @@ -184,45 +148,16 @@ func (whsvr *webhookServer) mutate(ar *v1beta1.AdmissionReview) *v1beta1.Admissi // Serve method for webhook server func (whsvr *webhookServer) Serve(w http.ResponseWriter, r *http.Request) { - var body []byte - if r.Body != nil { - if data, err := ioutil.ReadAll(r.Body); err == nil { - body = data - } - } - klog.V(5).Infof("Receive request: %+v", *r) - if len(body) == 0 { - klog.Error("empty body") - http.Error(w, "empty body", http.StatusBadRequest) - return - } - // verify the content type is accurate - contentType := r.Header.Get("Content-Type") - if contentType != "application/json" { - klog.Errorf("Content-Type=%s, expect application/json", contentType) - http.Error(w, "invalid Content-Type, expect `application/json`", http.StatusUnsupportedMediaType) + admissionReview, err := getRequestReview(r) + if err != nil { + klog.Error(err) + http.Error(w, err.Error(), http.StatusBadRequest) return } - var admissionResponse *v1beta1.AdmissionResponse - ar := v1beta1.AdmissionReview{} - if _, _, err := deserializer.Decode(body, nil, &ar); err != nil { - klog.Errorf("Can't decode body: %v", err) - admissionResponse = &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: err.Error(), - }, - } - } else { - if r.URL.Path == "/mutate" { - admissionResponse = whsvr.mutate(&ar) - } - } - admissionReview := v1beta1.AdmissionReview{} + admissionResponse := whsvr.mutate(admissionReview) if admissionResponse != nil { admissionReview.Response = admissionResponse - if ar.Request != nil { - admissionReview.Response.UID = ar.Request.UID - } + admissionReview.Response.UID = admissionReview.Request.UID } resp, err := json.Marshal(admissionReview) if err != nil { @@ -236,43 +171,47 @@ func (whsvr *webhookServer) Serve(w http.ResponseWriter, r *http.Request) { } } -func (whsvr *webhookServer) trySetNodeName(pod *corev1.Pod, ns string) { +func (whsvr *webhookServer) trySetNodeName(pod *corev1.Pod) { if pod.Spec.Volumes == nil { return } nodeName := "" for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim == nil { - continue - } - pvc, err := whsvr.pvcLister.PersistentVolumeClaims(ns).Get(volume.PersistentVolumeClaim.ClaimName) - if err != nil || pvc == nil { + pvcSource := volume.PersistentVolumeClaim + if pvcSource == nil { continue } - if pvc.Annotations == nil { - continue - } - if len(pvc.Annotations[util.SelectedNodeKey]) != 0 { - nodeName = pvc.Annotations[util.SelectedNodeKey] + nodeName = whsvr.getNodeNameFromPVC(pod.Namespace, pvcSource.ClaimName) + if len(nodeName) != 0 { + pod.Spec.NodeName = nodeName + klog.Infof("Set desired node name to %v ", nodeName) + return } } - if len(nodeName) != 0 { - pod.Spec.NodeName = nodeName - klog.Infof("Set desired node name to %v ", nodeName) - } return } +func (whsvr *webhookServer) getNodeNameFromPVC(ns, pvcName string) string { + var nodeName string + pvc, err := whsvr.pvcLister.PersistentVolumeClaims(ns).Get(pvcName) + if err != nil { + return nodeName + } + if pvc.Annotations == nil { + return nodeName + } + return pvc.Annotations[util.SelectedNodeKey] +} + func inject(pod *corev1.Pod, ignoreKeys []string) { nodeSelector := make(map[string]string) var affinity *corev1.Affinity - if len(pod.Spec.NodeSelector) == 0 && - pod.Spec.Affinity == nil && - pod.Spec.Tolerations == nil { + + if skipInject(pod) { return } - if pod.Spec.Affinity != nil && pod.Spec.Affinity.NodeAffinity != nil { + if pod.Spec.Affinity != nil { affinity = injectAffinity(pod.Spec.Affinity, ignoreKeys) } @@ -289,10 +228,6 @@ func inject(pod *corev1.Pod, ignoreKeys []string) { if err != nil { return } - if len(cnsByte) == 0 { - return - } - if pod.Annotations == nil { pod.Annotations = make(map[string]string) } @@ -318,6 +253,10 @@ func getPodTolerations(pod *corev1.Pod) []corev1.Toleration { } tolerations = append(tolerations, toleration) } + return addDefaultPodTolerations(tolerations, notReady, unSchedulable) +} + +func addDefaultPodTolerations(tolerations []corev1.Toleration, notReady, unSchedulable bool) []corev1.Toleration { if !notReady { tolerations = append(tolerations, desiredMap[util.TaintNodeNotReady]) } @@ -401,17 +340,17 @@ func injectAffinity(affinity *corev1.Affinity, ignoreLabels []string) *corev1.Af } } - filterdTerms := make([]corev1.NodeSelectorTerm, 0) + filteredTerms := make([]corev1.NodeSelectorTerm, 0) for _, term := range required.NodeSelectorTerms { if len(term.MatchFields) == 0 && len(term.MatchExpressions) == 0 { continue } - filterdTerms = append(filterdTerms, term) + filteredTerms = append(filteredTerms, term) } - if len(filterdTerms) == 0 { + if len(filteredTerms) == 0 { required = nil } else { - required.NodeSelectorTerms = filterdTerms + required.NodeSelectorTerms = filteredTerms } affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = required if len(nodeSelectorTerm) == 0 { @@ -421,3 +360,78 @@ func injectAffinity(affinity *corev1.Affinity, ignoreLabels []string) *corev1.Af RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{NodeSelectorTerms: nodeSelectorTerm}, }} } + +func shouldSkip(pod *corev1.Pod) bool { + if pod.Namespace == "kube-system" { + return true + } + if pod.Labels != nil { + if pod.Labels[util.CreatedbyDescheduler] == "true" { + return true + } + if !util.IsVirtualPod(pod) { + return true + } + } + return false +} + +func skipInject(pod *corev1.Pod) bool { + return len(pod.Spec.NodeSelector) == 0 && + pod.Spec.Affinity == nil && + pod.Spec.Tolerations == nil +} + +func getOwnerRef(pod *corev1.Pod) string { + ref := "" + if len(pod.OwnerReferences) > 0 { + ref = string(pod.OwnerReferences[0].UID) + } + return ref +} + +func setUnschedulableNodes(ref string, pod *corev1.Pod) { + node := "" + if len(ref) == 0 { + return + } + if pod.Annotations != nil { + node = pod.Annotations["unschedulable-node"] + } + if len(node) > 0 { + klog.Infof("Unschedulable nodes %+v ref %v to cache", node, ref) + freezeCache.Add(node, ref) + } +} + +func getUnschedulableNodes(ref string, pod *corev1.Pod) []string { + var nodes []string + if len(ref) == 0 { + return nodes + } + if len(pod.Spec.NodeName) != 0 { + return nodes + } + nodes = freezeCache.GetFreezeNodes(ref) + klog.Infof("Not in nodes %v for %v", nodes, ref) + return nodes +} + +func getRequestReview(r *http.Request) (*v1beta1.AdmissionReview, error) { + if r.Body == nil { + return nil, fmt.Errorf("empty body") + } + body, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + klog.V(5).Infof("Receive request: %+v", *r) + if len(body) == 0 { + return nil, fmt.Errorf("empty body") + } + ar := v1beta1.AdmissionReview{} + if deserializer.Decode(body, nil, &ar); err != nil { + return nil, fmt.Errorf("Can't decode body: %v", err) + } + return &ar, nil +} From e66d06d44ced19896281c6b8183f0b38a19f5603 Mon Sep 17 00:00:00 2001 From: wadecai Date: Tue, 29 Dec 2020 18:24:03 +0800 Subject: [PATCH 2/2] Change some func declares --- cmd/provider/main.go | 47 +++------------------- pkg/controllers/pv_controller.go | 14 +++++-- pkg/controllers/service_controller.go | 16 +++++--- pkg/controllers/service_controller_test.go | 14 +------ pkg/descheduler/descheduler.go | 1 - pkg/descheduler/evictions/evictions.go | 3 +- 6 files changed, 29 insertions(+), 66 deletions(-) diff --git a/cmd/provider/main.go b/cmd/provider/main.go index b4c9735..9b48c1c 100644 --- a/cmd/provider/main.go +++ b/cmd/provider/main.go @@ -34,7 +34,6 @@ import ( "golang.org/x/time/rate" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/util/workqueue" "k8s.io/klog" @@ -89,8 +88,7 @@ func main() { cfg.ConfigPath = o.KubeConfigPath provider, err := k8sprovider.NewVirtualK8S(cfg, &cc, ignoreLabels, enableServiceAccount, o) if err == nil { - go RunController(ctx, provider.GetMaster(), - provider.GetClient(), provider.GetNameSpaceLister(), cfg.NodeName, numberOfWorkers) + go RunController(ctx, provider, cfg.NodeName, numberOfWorkers) } return provider, err }), @@ -114,10 +112,10 @@ func main() { } // RunController starts controllers for objects needed to be synced -func RunController(ctx context.Context, master, - client kubernetes.Interface, nsLister corelisters.NamespaceLister, hostIP string, +func RunController(ctx context.Context, p *k8sprovider.VirtualK8S, hostIP string, workers int) *controllers.ServiceController { - + master := p.GetMaster() + client := p.GetClient() masterInformer := kubeinformers.NewSharedInformerFactory(master, 0) if masterInformer == nil { return nil @@ -136,10 +134,10 @@ func RunController(ctx context.Context, master, } switch c { case "PVControllers": - pvCtrl := buildPVController(master, client, masterInformer, clientInformer, hostIP) + pvCtrl := controllers.NewPVController(master, client, masterInformer, clientInformer, hostIP) runningControllers = append(runningControllers, pvCtrl) case "ServiceControllers": - serviceCtrl := buildServiceController(master, client, masterInformer, clientInformer, nsLister) + serviceCtrl := controllers.NewServiceController(master, client, masterInformer, clientInformer, p.GetNameSpaceLister()) runningControllers = append(runningControllers, serviceCtrl) default: klog.Warningf("Skip: %v", c) @@ -154,39 +152,6 @@ func RunController(ctx context.Context, master, return nil } -func buildServiceController(master, client kubernetes.Interface, masterInformer, - clientInformer kubeinformers.SharedInformerFactory, - nsLister corelisters.NamespaceLister) controllers.Controller { - // master - serviceInformer := masterInformer.Core().V1().Services() - endpointsInformer := masterInformer.Core().V1().Endpoints() - // client - clientServiceInformer := clientInformer.Core().V1().Services() - clientEndpointsInformer := clientInformer.Core().V1().Endpoints() - - serviceRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second) - endpointsRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second) - return controllers.NewServiceController(master, client, serviceInformer, endpointsInformer, - clientServiceInformer, clientEndpointsInformer, nsLister, serviceRateLimiter, endpointsRateLimiter) -} - -func buildPVController(master, client kubernetes.Interface, masterInformer, - clientInformer kubeinformers.SharedInformerFactory, hostIP string) controllers.Controller { - - pvcInformer := masterInformer.Core().V1().PersistentVolumeClaims() - pvInformer := masterInformer.Core().V1().PersistentVolumes() - - clientPVCInformer := clientInformer.Core().V1().PersistentVolumeClaims() - clientPVInformer := clientInformer.Core().V1().PersistentVolumes() - - pvcRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second) - pvRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second) - - return controllers.NewPVController(master, client, pvcInformer, pvInformer, - clientPVCInformer, - clientPVInformer, pvcRateLimiter, pvRateLimiter, hostIP) -} - func buildCommonControllers(client kubernetes.Interface, masterInformer, clientInformer kubeinformers.SharedInformerFactory) controllers.Controller { diff --git a/pkg/controllers/pv_controller.go b/pkg/controllers/pv_controller.go index 22eda23..a5e4ea6 100644 --- a/pkg/controllers/pv_controller.go +++ b/pkg/controllers/pv_controller.go @@ -27,7 +27,7 @@ import ( mergetypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -67,13 +67,19 @@ type PVController struct { // NewPVController returns a new *PVController func NewPVController(master kubernetes.Interface, client kubernetes.Interface, - pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, - clientPVCInformer coreinformers.PersistentVolumeClaimInformer, clientPVInformer coreinformers.PersistentVolumeInformer, - pvcRateLimiter, pvRateLimiter workqueue.RateLimiter, hostIP string) Controller { + masterInformer, clientInformer informers.SharedInformerFactory, hostIP string) Controller { broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: master.CoreV1().Events(v1.NamespaceAll)}) var eventRecorder record.EventRecorder eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "virtual-kubelet"}) + pvcInformer := masterInformer.Core().V1().PersistentVolumeClaims() + pvInformer := masterInformer.Core().V1().PersistentVolumes() + + clientPVCInformer := clientInformer.Core().V1().PersistentVolumeClaims() + clientPVInformer := clientInformer.Core().V1().PersistentVolumes() + + pvcRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second) + pvRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second) ctrl := &PVController{ master: master, client: client, diff --git a/pkg/controllers/service_controller.go b/pkg/controllers/service_controller.go index 442107e..68e9487 100644 --- a/pkg/controllers/service_controller.go +++ b/pkg/controllers/service_controller.go @@ -28,7 +28,7 @@ import ( mergetypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -62,14 +62,20 @@ type ServiceController struct { // NewServiceController returns a new *ServiceController func NewServiceController(master kubernetes.Interface, client kubernetes.Interface, - serviceInformer coreinformers.ServiceInformer, endpointsInformer coreinformers.EndpointsInformer, - clientServiceInformer coreinformers.ServiceInformer, clientEndpointsInformer coreinformers.EndpointsInformer, - nsLister corelisters.NamespaceLister, - serviceRateLimiter, endpointsRateLimiter workqueue.RateLimiter) Controller { + masterInformer, clientInformer informers.SharedInformerFactory, + nsLister corelisters.NamespaceLister) Controller { broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: master.CoreV1().Events(v1.NamespaceAll)}) var eventRecorder record.EventRecorder eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "virtual-kubelet"}) + serviceRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second) + endpointsRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second) + // master + serviceInformer := masterInformer.Core().V1().Services() + endpointsInformer := masterInformer.Core().V1().Endpoints() + // client + clientServiceInformer := clientInformer.Core().V1().Services() + clientEndpointsInformer := clientInformer.Core().V1().Endpoints() ctrl := &ServiceController{ master: master, client: client, diff --git a/pkg/controllers/service_controller_test.go b/pkg/controllers/service_controller_test.go index a184e0c..fd4b990 100644 --- a/pkg/controllers/service_controller_test.go +++ b/pkg/controllers/service_controller_test.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/controller" ) @@ -424,19 +423,8 @@ func newServiceController() *svcTestBase { clientInformer := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) masterInformer := informers.NewSharedInformerFactory(master, controller.NoResyncPeriodFunc()) - serviceInformer := masterInformer.Core().V1().Services() - endPointsInformer := masterInformer.Core().V1().Endpoints() - nsLister := masterInformer.Core().V1().Namespaces().Lister() - - clientServiceInformer := clientInformer.Core().V1().Services() - clientEndPointsInformer := clientInformer.Core().V1().Endpoints() - - serviceRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second) - endPointsRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second) - - controller := NewServiceController(master, client, serviceInformer, endPointsInformer, clientServiceInformer, - clientEndPointsInformer, nsLister, serviceRateLimiter, endPointsRateLimiter) + controller := NewServiceController(master, client, masterInformer, clientInformer, nsLister) c := controller.(*ServiceController) return &svcTestBase{ c: c, diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index 8e6ec73..d89783c 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -99,7 +99,6 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer podEvictor := evictions.NewPodEvictor( rs.Client, evictionPolicyGroupVersion, - rs.DryRun, rs.MaxNoOfPodsToEvictPerNode, nodes, unschedulableCache, ) diff --git a/pkg/descheduler/evictions/evictions.go b/pkg/descheduler/evictions/evictions.go index 5d127bb..5964306 100644 --- a/pkg/descheduler/evictions/evictions.go +++ b/pkg/descheduler/evictions/evictions.go @@ -66,7 +66,6 @@ type PodEvictor struct { func NewPodEvictor( client clientset.Interface, policyGroupVersion string, - dryRun bool, maxPodsToEvict int, nodes []*v1.Node, unschedulableCache *util.UnschedulableCache) *PodEvictor { var nodePodCount = make(nodePodEvictedCount) @@ -90,7 +89,7 @@ func NewPodEvictor( return &PodEvictor{ client: client, policyGroupVersion: policyGroupVersion, - dryRun: dryRun, + dryRun: false, maxPodsToEvict: maxPodsToEvict, nodepodCount: nodePodCount, nodeNum: virtualCount,