Skip to content

Commit

Permalink
Enable 'direct-pod-ip' mode LB svc
Browse files Browse the repository at this point in the history
  • Loading branch information
chestack committed Jun 20, 2022
1 parent bf76958 commit 04425e5
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 2 deletions.
2 changes: 2 additions & 0 deletions app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ func startServiceController(ctx context.Context, initContext ControllerInitConte
completedConfig.ClientBuilder.ClientOrDie(initContext.ClientName),
completedConfig.SharedInformers.Core().V1().Services(),
completedConfig.SharedInformers.Core().V1().Nodes(),
completedConfig.SharedInformers.Core().V1().Endpoints(),
completedConfig.ComponentConfig.KubeCloudShared.ClusterName,
utilfeature.DefaultFeatureGate,
completedConfig.ComponentConfig.ServiceController.DirectPodIP,
)
if err != nil {
// This error shouldn't fail. It lives like this as a legacy.
Expand Down
5 changes: 5 additions & 0 deletions controllers/service/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ type ServiceControllerConfiguration struct {
// allowed to sync concurrently. Larger number = more responsive service
// management, but more CPU (and network) load.
ConcurrentServiceSyncs int32

// directPodIp is the mode of load balancer service
// If true configured, load balancer will use pods ip addresses
// as pool members instead of nodes addresses.
DirectPodIP bool
}
124 changes: 123 additions & 1 deletion controllers/service/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ const (
// ToBeDeletedTaint is a taint used by the CLuster Autoscaler before marking a node for deletion. Defined in
// https://github.com/kubernetes/autoscaler/blob/e80ab518340f88f364fe3ef063f8303755125971/cluster-autoscaler/utils/deletetaint/delete.go#L36
ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler"

serviceAnnotationLoadBalancerDirectPodIP = "loadbalancer.openstack.org/direct-pod-ip"
)

type cachedService struct {
Expand Down Expand Up @@ -90,13 +92,20 @@ type Controller struct {
// services that need to be synced
queue workqueue.RateLimitingInterface

endpointLister corelisters.EndpointsLister
endpointListerSynced cache.InformerSynced
// endpoints that need to be synced
endpointsQueue workqueue.RateLimitingInterface

// nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time
// and protects internal states (needFullSync) of nodeSync
nodeSyncLock sync.Mutex
// nodeSyncCh triggers nodeSyncLoop to run
nodeSyncCh chan interface{}
// needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services.
needFullSync bool

directPodIP bool
}

// New returns a new service controller to keep cloud provider service resources
Expand All @@ -106,8 +115,10 @@ func New(
kubeClient clientset.Interface,
serviceInformer coreinformers.ServiceInformer,
nodeInformer coreinformers.NodeInformer,
endpointInformer coreinformers.EndpointsInformer,
clusterName string,
featureGate featuregate.FeatureGate,
directPodIP bool,
) (*Controller, error) {
broadcaster := record.NewBroadcaster()
broadcaster.StartStructuredLogging(0)
Expand All @@ -134,6 +145,10 @@ func New(
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
// nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached.
nodeSyncCh: make(chan interface{}, 1),
endpointLister: endpointInformer.Lister(),
endpointListerSynced: endpointInformer.Informer().HasSynced,
endpointsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "endpoint"),
directPodIP: directPodIP,
}

serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
Expand Down Expand Up @@ -161,6 +176,10 @@ func New(
s.serviceLister = serviceInformer.Lister()
s.serviceListerSynced = serviceInformer.Informer().HasSynced

endpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: s.updateEndpoint,
})

nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
Expand Down Expand Up @@ -189,7 +208,6 @@ func New(
},
time.Duration(0),
)

if err := s.init(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -229,6 +247,7 @@ func (s *Controller) enqueueService(obj interface{}) {
func (s *Controller) Run(ctx context.Context, workers int) {
defer runtime.HandleCrash()
defer s.queue.ShutDown()
defer s.endpointsQueue.ShutDown()

klog.Info("Starting service controller")
defer klog.Info("Shutting down service controller")
Expand All @@ -239,6 +258,7 @@ func (s *Controller) Run(ctx context.Context, workers int) {

for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, s.worker, time.Second)
go wait.UntilWithContext(ctx, s.endpointWorker, time.Second)
}

go s.nodeSyncLoop(ctx, workers)
Expand Down Expand Up @@ -284,6 +304,50 @@ func (s *Controller) worker(ctx context.Context) {
}
}

// endpointWorker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (s *Controller) endpointWorker(ctx context.Context) {
for s.processNextEndpointWorkItem(ctx) {
}
}

func (s *Controller) processNextEndpointWorkItem(ctx context.Context) bool {
key, quit := s.endpointsQueue.Get()
if quit {
return false
}
defer s.endpointsQueue.Done(key)

err := s.updateService(ctx, key.(string))
if err == nil {
s.endpointsQueue.Forget(key)
return true
}

runtime.HandleError(fmt.Errorf("error updating endpoints for service %v (will retry): %v", key, err))
s.endpointsQueue.AddRateLimited(key)
return true
}

// updateService will call updateLoadBalancer to update pool members
func (s *Controller) updateService(ctx context.Context, key string) error {

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
service, err := s.serviceLister.Services(namespace).Get(name)
if err != nil {
klog.Errorf("Failed to get service %s with error %v", name, err)
return err
}

var emptyNodes [] *v1.Node
err = s.balancer.UpdateLoadBalancer(ctx, s.clusterName, service, emptyNodes)

return err
}

// nodeSyncLoop takes nodeSync signal and triggers nodeSync
func (s *Controller) nodeSyncLoop(ctx context.Context, workers int) {
klog.V(4).Info("nodeSyncLoop Started")
Expand Down Expand Up @@ -600,6 +664,49 @@ func (s *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service)
return false
}

// updateEndpoint
func (s *Controller) updateEndpoint(old, new interface{}){
oldep := old.(*v1.Endpoints)
newep := new.(*v1.Endpoints)

klog.Infof("******* old Endpoints %v", oldep)
klog.Infof("####### new Endpoints %v", newep)

// service of the endpoint
name := newep.Name
namespace := newep.Namespace

service, err := s.serviceLister.Services(namespace).Get(name)

if errors.IsNotFound(err) {
klog.Warningf("Failed to get service %s with error %v, do not need update service", name, err)
return
} else if err != nil {
klog.Errorf("Failed to get service %s with error %v", name, err)
return
}

if service.Spec.Type == v1.ServiceTypeLoadBalancer {
if s.ifDirectPodIPMode(service) {
if !reflect.DeepEqual(oldep.Subsets, newep.Subsets) {
klog.Infof("!!!!!!!!!!!!!! Need to update service %s", name)
s.enqueueEndpoint(new)
}
}
}
}

// obj could be an *v1.Endpoints, or a DeletionFinalStateUnknown marker item.
func (s *Controller) enqueueEndpoint(obj interface{}) {
klog.Infof("UUUUUUUUUUUUUUUUUUUUUUUU update endpoint %s", obj)
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
return
}
s.endpointsQueue.Add(key)
}

func getPortsForLB(service *v1.Service) []*v1.ServicePort {
ports := []*v1.ServicePort{}
for i := range service.Spec.Ports {
Expand Down Expand Up @@ -766,6 +873,11 @@ func (s *Controller) nodeSyncService(svc *v1.Service) bool {
if svc == nil || !wantsLoadBalancer(svc) {
return false
}
if s.ifDirectPodIPMode(svc) {
klog.Infof("service %s/%s is direct-pod-ip mode, skip sync nodes", svc.Namespace, svc.Name)
return false
}

klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
hosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate())
if err != nil {
Expand Down Expand Up @@ -989,3 +1101,13 @@ func listWithPredicate(nodeLister corelisters.NodeLister, predicate NodeConditio

return filtered, nil
}

func (s *Controller) ifDirectPodIPMode(service *v1.Service) bool {
if s.directPodIP {
return true
}
if annotationValue, ok := service.Annotations[serviceAnnotationLoadBalancerDirectPodIP]; ok && annotationValue == "true" {
return true
}
return false
}
2 changes: 2 additions & 0 deletions options/servicecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (o *ServiceControllerOptions) AddFlags(fs *pflag.FlagSet) {
}

fs.Int32Var(&o.ConcurrentServiceSyncs, "concurrent-service-syncs", o.ConcurrentServiceSyncs, "The number of services that are allowed to sync concurrently. Larger number = more responsive service management, but more CPU (and network) load")
fs.BoolVar(&o.DirectPodIP, "direct-pod-ip",true, "If 'true' configured, load balancer will use pods ip addresses as pool members instead of nodes addresses.")
}

// ApplyTo fills up ServiceController config with options.
Expand All @@ -42,6 +43,7 @@ func (o *ServiceControllerOptions) ApplyTo(cfg *serviceconfig.ServiceControllerC
}

cfg.ConcurrentServiceSyncs = o.ConcurrentServiceSyncs
cfg.DirectPodIP = o.DirectPodIP

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion service/helpers/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,4 @@ func ingressEqual(lhs, rhs *v1.LoadBalancerIngress) bool {
return false
}
return true
}
}

0 comments on commit 04425e5

Please sign in to comment.