From 872340c87cb2e0db84fcd52f9669b99333a1d21f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8C=83=E6=97=A5=E6=98=8E?= Date: Sun, 20 Jun 2021 19:51:07 +0800 Subject: [PATCH] clean up vpc service --- pkg/controller/controller.go | 24 ++++----- pkg/controller/gc.go | 19 ------- pkg/controller/service.go | 98 ++++++++++++++---------------------- pkg/controller/vpc.go | 2 +- 4 files changed, 50 insertions(+), 93 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index bd51437fe17..302bdf06df2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -92,11 +92,10 @@ type Controller struct { updateNodeQueue workqueue.RateLimitingInterface deleteNodeQueue workqueue.RateLimitingInterface - servicesLister v1.ServiceLister - serviceSynced cache.InformerSynced - deleteTcpServiceQueue workqueue.RateLimitingInterface - deleteUdpServiceQueue workqueue.RateLimitingInterface - updateServiceQueue workqueue.RateLimitingInterface + servicesLister v1.ServiceLister + serviceSynced cache.InformerSynced + deleteServiceQueue workqueue.RateLimitingInterface + updateServiceQueue workqueue.RateLimitingInterface endpointsLister v1.EndpointsLister endpointsSynced cache.InformerSynced @@ -210,11 +209,10 @@ func NewController(config *Configuration) *Controller { updateNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNode"), deleteNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNode"), - servicesLister: serviceInformer.Lister(), - serviceSynced: serviceInformer.Informer().HasSynced, - deleteTcpServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteTcpService"), - deleteUdpServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteUdpService"), - updateServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateService"), + servicesLister: serviceInformer.Lister(), + serviceSynced: serviceInformer.Informer().HasSynced, + deleteServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteService"), + updateServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateService"), endpointsLister: endpointInformer.Lister(), endpointsSynced: endpointInformer.Informer().HasSynced, @@ -372,8 +370,7 @@ func (c *Controller) shutdown() { c.updateNodeQueue.ShutDown() c.deleteNodeQueue.ShutDown() - c.deleteTcpServiceQueue.ShutDown() - c.deleteUdpServiceQueue.ShutDown() + c.deleteServiceQueue.ShutDown() c.updateServiceQueue.ShutDown() c.updateEndpointQueue.ShutDown() @@ -460,8 +457,7 @@ func (c *Controller) startWorkers(stopCh <-chan struct{}) { go wait.Until(c.runUpdateVpcStatusWorker, time.Second, stopCh) // run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer - go wait.Until(c.runDeleteTcpServiceWorker, time.Second, stopCh) - go wait.Until(c.runDeleteUdpServiceWorker, time.Second, stopCh) + go wait.Until(c.runDeleteServiceWorker, time.Second, stopCh) for i := 0; i < c.config.WorkerNum; i++ { go wait.Until(c.runAddPodWorker, time.Second, stopCh) go wait.Until(c.runDeletePodWorker, time.Second, stopCh) diff --git a/pkg/controller/gc.go b/pkg/controller/gc.go index fa7bc3c3d82..7fd895325f1 100644 --- a/pkg/controller/gc.go +++ b/pkg/controller/gc.go @@ -279,25 +279,6 @@ func (c *Controller) gcLoadBalancer() error { } } - lbUuid, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpLoadBalancer) - if err != nil { - klog.Errorf("failed to get lb %v", err) - } - vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid) - if err != nil { - klog.Errorf("failed to get tcp lb vips %v", err) - return err - } - for vip := range vips { - if !util.IsStringIn(vip, tcpVips) { - err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer) - if err != nil { - klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err) - return err - } - } - } - vpcs, err := c.vpcsLister.List(labels.Everything()) if err != nil { klog.Errorf("failed to list vpc, %v", err) diff --git a/pkg/controller/service.go b/pkg/controller/service.go index 7a1bc3ef135..93853b040e0 100644 --- a/pkg/controller/service.go +++ b/pkg/controller/service.go @@ -15,19 +15,28 @@ import ( "github.com/kubeovn/kube-ovn/pkg/util" ) +type vpcService struct { + Vip string + Vpc string + Protocol v1.Protocol +} + func (c *Controller) enqueueDeleteService(obj interface{}) { if !c.isLeader() { return } svc := obj.(*v1.Service) - klog.V(3).Infof("enqueue delete service %s/%s", svc.Namespace, svc.Name) + //klog.V(3).Infof("enqueue delete service %s/%s", svc.Namespace, svc.Name) + klog.Infof("enqueue delete service %s/%s", svc.Namespace, svc.Name) if svc.Spec.ClusterIP != v1.ClusterIPNone && svc.Spec.ClusterIP != "" { for _, port := range svc.Spec.Ports { - if port.Protocol == v1.ProtocolTCP { - c.deleteTcpServiceQueue.Add(fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, port.Port)) - } else if port.Protocol == v1.ProtocolUDP { - c.deleteUdpServiceQueue.Add(fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, port.Port)) + vpcSvc := &vpcService{ + Vip: fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, port.Port), + Protocol: port.Protocol, + Vpc: svc.Annotations[util.VpcAnnotation], } + klog.Infof("delete vpc service %v", vpcSvc) + c.deleteServiceQueue.Add(vpcSvc) } } } @@ -52,13 +61,8 @@ func (c *Controller) enqueueUpdateService(old, new interface{}) { c.updateServiceQueue.Add(key) } -func (c *Controller) runDeleteTcpServiceWorker() { - for c.processNextDeleteTcpServiceWorkItem() { - } -} - -func (c *Controller) runDeleteUdpServiceWorker() { - for c.processNextDeleteUdpServiceWorkItem() { +func (c *Controller) runDeleteServiceWorker() { + for c.processNextDeleteServiceWorkItem() { } } @@ -67,58 +71,28 @@ func (c *Controller) runUpdateServiceWorker() { } } -func (c *Controller) processNextDeleteTcpServiceWorkItem() bool { - obj, shutdown := c.deleteTcpServiceQueue.Get() +func (c *Controller) processNextDeleteServiceWorkItem() bool { + obj, shutdown := c.deleteServiceQueue.Get() if shutdown { return false } err := func(obj interface{}) error { - defer c.deleteTcpServiceQueue.Done(obj) - var key string + defer c.deleteServiceQueue.Done(obj) + var vpcSvc *vpcService var ok bool - if key, ok = obj.(string); !ok { - c.deleteTcpServiceQueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + if vpcSvc, ok = obj.(*vpcService); !ok { + c.deleteServiceQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected vpcService in workqueue but got %#v", obj)) return nil } - if err := c.handleDeleteService(key, v1.ProtocolTCP); err != nil { - c.deleteTcpServiceQueue.AddRateLimited(key) - return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) - } - c.deleteTcpServiceQueue.Forget(obj) - return nil - }(obj) - - if err != nil { - utilruntime.HandleError(err) - return true - } - return true -} - -func (c *Controller) processNextDeleteUdpServiceWorkItem() bool { - obj, shutdown := c.deleteUdpServiceQueue.Get() - if shutdown { - return false - } - - err := func(obj interface{}) error { - defer c.deleteUdpServiceQueue.Done(obj) - var key string - var ok bool - if key, ok = obj.(string); !ok { - c.deleteUdpServiceQueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil + if err := c.handleDeleteService(vpcSvc); err != nil { + c.deleteServiceQueue.AddRateLimited(obj) + return fmt.Errorf("error syncing '%s': %s, requeuing", vpcSvc.Vip, err.Error()) } - if err := c.handleDeleteService(key, v1.ProtocolUDP); err != nil { - c.deleteUdpServiceQueue.AddRateLimited(key) - return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) - } - c.deleteUdpServiceQueue.Forget(obj) + c.deleteServiceQueue.Forget(obj) return nil }(obj) @@ -160,29 +134,35 @@ func (c *Controller) processNextUpdateServiceWorkItem() bool { return true } -func (c *Controller) handleDeleteService(vip string, protocol v1.Protocol) error { +func (c *Controller) handleDeleteService(service *vpcService) error { svcs, err := c.servicesLister.Services(v1.NamespaceAll).List(labels.Everything()) if err != nil { klog.Errorf("failed to list svc, %v", err) return err } for _, svc := range svcs { - if svc.Spec.ClusterIP == parseVipAddr(vip) { + if svc.Spec.ClusterIP == parseVipAddr(service.Vip) { return nil } } - if protocol == v1.ProtocolTCP { - if err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer); err != nil { + vpcLbConfig := c.GenVpcLoadBalancer(service.Vpc) + vip := service.Vip + if service.Protocol == v1.ProtocolTCP { + if err := c.ovnClient.DeleteLoadBalancerVip(vip, vpcLbConfig.TcpLoadBalancer); err != nil { klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err) return err } - if err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpSessionLoadBalancer); err != nil { + if err := c.ovnClient.DeleteLoadBalancerVip(vip, vpcLbConfig.TcpSessLoadBalancer); err != nil { klog.Errorf("failed to delete vip %s from tcp session lb, %v", vip, err) return err } } else { - if err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpSessionLoadBalancer); err != nil { + if err := c.ovnClient.DeleteLoadBalancerVip(vip, vpcLbConfig.UdpLoadBalancer); err != nil { + klog.Errorf("failed to delete vip %s from udp lb, %v", vip, err) + return err + } + if err := c.ovnClient.DeleteLoadBalancerVip(vip, vpcLbConfig.UdpSessLoadBalancer); err != nil { klog.Errorf("failed to delete vip %s from udp session lb, %v", vip, err) return err } diff --git a/pkg/controller/vpc.go b/pkg/controller/vpc.go index b79e880c19a..db3eabac040 100644 --- a/pkg/controller/vpc.go +++ b/pkg/controller/vpc.go @@ -164,7 +164,7 @@ type VpcLoadBalancer struct { } func (c *Controller) GenVpcLoadBalancer(vpcKey string) *VpcLoadBalancer { - if vpcKey == util.DefaultVpc { + if vpcKey == util.DefaultVpc || vpcKey == "" { return &VpcLoadBalancer{ TcpLoadBalancer: c.config.ClusterTcpLoadBalancer, TcpSessLoadBalancer: c.config.ClusterTcpSessionLoadBalancer,