diff --git a/go.sum b/go.sum index e88164327..6f75d77ac 100644 --- a/go.sum +++ b/go.sum @@ -86,7 +86,6 @@ github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSY github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= -github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk= github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE= github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc= @@ -351,7 +350,6 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4= github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= @@ -394,7 +392,6 @@ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dvyukov/go-fuzz v0.0.0-20210914135545-4980593459a1/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw= -github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= @@ -1044,7 +1041,6 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= -github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= @@ -1117,7 +1113,6 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v0.0.0-20180303142811-b89eecf5ca5d/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -1845,7 +1840,6 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= -gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= helm.sh/helm/v3 v3.7.1/go.mod h1:3eOeBD3Z+O/ELiuu19zynZSN8jP1ErXLuyP21SZeMq8= diff --git a/pkg/bootstrap/server.go b/pkg/bootstrap/server.go index 302e95bb7..22abe32eb 100644 --- a/pkg/bootstrap/server.go +++ b/pkg/bootstrap/server.go @@ -21,15 +21,15 @@ import ( "github.com/aeraki-mesh/aeraki/pkg/leaderelection" - "github.com/aeraki-mesh/aeraki/client-go/pkg/clientset/versioned/scheme" + aerakischeme "github.com/aeraki-mesh/aeraki/client-go/pkg/clientset/versioned/scheme" "github.com/aeraki-mesh/aeraki/pkg/config" - "github.com/aeraki-mesh/aeraki/pkg/config/serviceentry" + "github.com/aeraki-mesh/aeraki/pkg/controller" "github.com/aeraki-mesh/aeraki/pkg/envoyfilter" - "github.com/aeraki-mesh/aeraki/pkg/kube/controller" "github.com/aeraki-mesh/aeraki/pkg/model/protocol" "github.com/aeraki-mesh/aeraki/pkg/xds" "github.com/aeraki-mesh/aeraki/plugin/dubbo" "github.com/aeraki-mesh/aeraki/plugin/redis" + istioscheme "istio.io/client-go/pkg/apis/networking/v1alpha3" "istio.io/client-go/pkg/clientset/versioned" "istio.io/istio/pilot/pkg/model" istioconfig "istio.io/istio/pkg/config" @@ -48,15 +48,15 @@ var ( // Server contains the runtime configuration for the Aeraki service. type Server struct { - args *AerakiArgs - kubeClient kubernetes.Interface - configController *config.Controller - serviceEntryController *serviceentry.Controller - envoyFilterController *envoyfilter.Controller - xdsCacheMgr *xds.CacheMgr - xdsServer *xds.Server - crdCtrlMgr manager.Manager - stopCRDController func() + args *AerakiArgs + kubeClient kubernetes.Interface + configController *config.Controller + envoyFilterController *envoyfilter.Controller + xdsCacheMgr *xds.CacheMgr + xdsServer *xds.Server + scalableCtrlMgr manager.Manager + singletonCtrlMgr manager.Manager + stopCRDController func() } // NewServer creates a new Server instance based on the provided arguments. @@ -77,12 +77,6 @@ func NewServer(args *AerakiArgs) (*Server, error) { NameSpace: args.Namespace, }) - // Istio can allocate a VIP for a serviceentry, but the IPs are allocated in a sidecar scope, hence the IP of a - // service is not consistent across sidecar border. - // Since Aeraki is using the VIP of a serviceEntry as match condition when generating EnvoyFilter, - // the VIP must be unique and consistent in the mesh. - serviceEntryController := serviceentry.NewController(client) - // envoyFilterController watches changes on config and create/update corresponding EnvoyFilters envoyFilterController := envoyfilter.NewController(client, configController.Store, args.Protocols, args.EnableEnvoyFilterNSScope) @@ -101,36 +95,42 @@ func NewServer(args *AerakiArgs) (*Server, error) { xdsServer := xds.NewServer(args.XdsAddr, routeCacheMgr) // crdCtrlMgr watches Aeraki CRDs, such as MetaRouter, ApplicationProtocol, etc. - crdCtrlMgr, err := createCrdControllers(args, kubeConfig, envoyFilterController, routeCacheMgr) + scalableCtrlMgr, err := createScalableControllers(args, kubeConfig, envoyFilterController, routeCacheMgr) if err != nil { return nil, err } // routeCacheMgr uses controller manager client to get route configuration in MetaRouters - routeCacheMgr.MetaRouterControllerClient = crdCtrlMgr.GetClient() + routeCacheMgr.MetaRouterControllerClient = scalableCtrlMgr.GetClient() // envoyFilterController uses controller manager client to get the rate limit configuration in MetaRouters - envoyFilterController.MetaRouterControllerClient = crdCtrlMgr.GetClient() + envoyFilterController.MetaRouterControllerClient = scalableCtrlMgr.GetClient() //todo replace config with cached client - cfg := crdCtrlMgr.GetConfig() - args.Protocols[protocol.Dubbo] = dubbo.NewGenerator(crdCtrlMgr.GetConfig()) + cfg := scalableCtrlMgr.GetConfig() + args.Protocols[protocol.Dubbo] = dubbo.NewGenerator(scalableCtrlMgr.GetConfig()) args.Protocols[protocol.Redis] = redis.New(cfg, configController.Store) + // singletonCtrlMgr + singletonCtrlMgr, err := createSingletonControllers(args, kubeConfig) + if err != nil { + return nil, err + } server := &Server{ - args: args, - configController: configController, - envoyFilterController: envoyFilterController, - crdCtrlMgr: crdCtrlMgr, - serviceEntryController: serviceEntryController, - xdsCacheMgr: routeCacheMgr, - xdsServer: xdsServer, + args: args, + configController: configController, + envoyFilterController: envoyFilterController, + scalableCtrlMgr: scalableCtrlMgr, + singletonCtrlMgr: singletonCtrlMgr, + xdsCacheMgr: routeCacheMgr, + xdsServer: xdsServer, } err = server.initKubeClient() return server, err } -func createCrdControllers(args *AerakiArgs, kubeConfig *rest.Config, +// These controllers are horizontally scalable, multiple instances can be deployed to share the load +func createScalableControllers(args *AerakiArgs, kubeConfig *rest.Config, envoyFilterController *envoyfilter.Controller, xdsCacheMgr *xds.CacheMgr) (manager.Manager, error) { - crdCtrlMgr, err := controller.NewManager(kubeConfig, args.Namespace) + mgr, err := controller.NewManager(kubeConfig, args.Namespace, false, "") if err != nil { return nil, err } @@ -143,23 +143,23 @@ func createCrdControllers(args *AerakiArgs, kubeConfig *rest.Config, xdsCacheMgr.UpdateRoute() return nil } - err = controller.AddRedisServiceController(crdCtrlMgr, updateEnvoyFilter) + err = controller.AddRedisServiceController(mgr, updateEnvoyFilter) if err != nil { aerakiLog.Fatalf("could not add RedisServiceController: %e", err) } - err = controller.AddRedisDestinationController(crdCtrlMgr, updateEnvoyFilter) + err = controller.AddRedisDestinationController(mgr, updateEnvoyFilter) if err != nil { aerakiLog.Fatalf("could not add RedisDestinationController: %e", err) } - err = controller.AddDubboAuthorizationPolicyController(crdCtrlMgr, updateEnvoyFilter) + err = controller.AddDubboAuthorizationPolicyController(mgr, updateEnvoyFilter) if err != nil { aerakiLog.Fatalf("could not add DubboAuthorizationPolicyController: %e", err) } - err = controller.AddApplicationProtocolController(crdCtrlMgr, updateEnvoyFilter) + err = controller.AddApplicationProtocolController(mgr, updateEnvoyFilter) if err != nil { aerakiLog.Fatalf("could not add ApplicationProtocolController: %e", err) } - err = controller.AddMetaRouterController(crdCtrlMgr, func() error { + err = controller.AddMetaRouterController(mgr, func() error { if err := updateEnvoyFilter(); err != nil { //MetaRouter Rate limit config will cause update on EnvoyFilters return err } @@ -171,11 +171,34 @@ func createCrdControllers(args *AerakiArgs, kubeConfig *rest.Config, if err != nil { aerakiLog.Fatalf("could not add MetaRouterController: %e", err) } - err = scheme.AddToScheme(crdCtrlMgr.GetScheme()) + err = aerakischeme.AddToScheme(mgr.GetScheme()) + if err != nil { + aerakiLog.Fatalf("could not add schema: %e", err) + } + return mgr, nil +} + +// The Service Entry Controller is used to assign a globally unique VIP to a service entry, +// hence only one instance can get the lock to run +// +// Istio can allocate a VIP for a serviceentry, but the IPs are allocated in a sidecar scope, hence the IP of a +// service is not consistent across sidecar border. +// Since Aeraki is using the VIP of a serviceEntry as match condition when generating EnvoyFilter, +// the VIP must be unique and consistent in the mesh. +func createSingletonControllers(args *AerakiArgs, kubeConfig *rest.Config) (manager.Manager, error) { + mgr, err := controller.NewManager(kubeConfig, args.Namespace, true, leaderelection.AllocateVIPController) + if err != nil { + return nil, err + } + err = controller.AddserviceEntryController(mgr) + if err != nil { + aerakiLog.Fatalf("could not add ServiceEntryController: %e", err) + } + err = istioscheme.AddToScheme(mgr.GetScheme()) if err != nil { aerakiLog.Fatalf("could not add schema: %e", err) } - return crdCtrlMgr, nil + return mgr, nil } // Start starts all components of the Aeraki service. Serving can be canceled at any time by closing the provided stop channel. @@ -194,14 +217,6 @@ func (s *Server) Start(stop <-chan struct{}) { s.envoyFilterController.Run(stop) }).Run(stop) }() - go func() { - leaderelection. - NewLeaderElection(s.args.Namespace, s.args.ServerID, leaderelection.AllocateVIPController, s.kubeClient). - AddRunFunction(func(leaderStop <-chan struct{}) { - aerakiLog.Infof("starting ServiceEntry IP allocation controller") - s.serviceEntryController.Run(stop) - }).Run(stop) - }() } else { aerakiLog.Infof("aeraki is running as a slave, only xds server will be started") } @@ -223,7 +238,16 @@ func (s *Server) Start(stop <-chan struct{}) { ctx, cancel := context.WithCancel(context.Background()) s.stopCRDController = cancel go func() { - _ = s.crdCtrlMgr.Start(ctx) + err := s.scalableCtrlMgr.Start(ctx) + if err != nil { + aerakiLog.Errorf("failed to start controllers: %v", err) + } + }() + go func() { + err := s.singletonCtrlMgr.Start(ctx) + if err != nil { + aerakiLog.Errorf("failed to start controllers: %v", err) + } }() s.waitForShutdown(stop) diff --git a/pkg/controller/serviceentry.go b/pkg/controller/serviceentry.go new file mode 100644 index 000000000..6a379f5fc --- /dev/null +++ b/pkg/controller/serviceentry.go @@ -0,0 +1,206 @@ +// Copyright Aeraki Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "fmt" + "reflect" + "strings" + + "k8s.io/apimachinery/pkg/api/errors" + + "github.com/aeraki-mesh/aeraki/pkg/model" + + "sigs.k8s.io/controller-runtime/pkg/client" + + istionapi "istio.io/api/networking/v1alpha3" + networking "istio.io/client-go/pkg/apis/networking/v1alpha3" + "istio.io/pkg/log" + controllerclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const aerakiFieldManager = "Aeraki" + +var serviceEntryLog = log.RegisterScope("service-entry-controller", "service-entry-controller debugging", 0) + +var ( + serviceEntryPredicates = predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + switch old := e.ObjectOld.(type) { + case *networking.ServiceEntry: + new, ok := e.ObjectNew.(*networking.ServiceEntry) + if !ok { + return false + } + if !reflect.DeepEqual(old.Spec, new.Spec) { + return true + } + default: + return false + } + return false + }, + } +) + +// serviceEntryController allocate VIPs to service entries +type serviceEntryController struct { + client.Client + serviceIPs map[string]client.ObjectKey + maxIP int +} + +// Reconcile will try to trigger once mcp push. +func (r *serviceEntryController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + serviceEntryLog.Infof("reconcile: %s/%s", request.Namespace, request.Name) + + // Fetch the ReplicaSet from the cache + se := &networking.ServiceEntry{} + err := r.Get(ctx, request.NamespacedName, se) + if errors.IsNotFound(err) { + //The service entry has been deleted, the IP will be recycled when we reach 255*255 + return reconcile.Result{}, nil + } + + if err != nil { + return reconcile.Result{}, fmt.Errorf("could not fetch ServiceEntry: %+v", err) + } + + r.autoAllocateIP(request.NamespacedName, se) + return reconcile.Result{}, nil +} + +// AddserviceEntryController adds serviceEntryController +func AddserviceEntryController(mgr manager.Manager) error { + serviceEntryCtrl := &serviceEntryController{ + Client: mgr.GetClient(), + serviceIPs: make(map[string]client.ObjectKey), + } + c, err := controller.New("aeraki-service-entry-controller", mgr, + controller.Options{Reconciler: serviceEntryCtrl}) + if err != nil { + return err + } + // Watch for changes on ServiceEntry CRD + err = c.Watch(&source.Kind{Type: &networking.ServiceEntry{}}, &handler.EnqueueRequestForObject{}, + serviceEntryPredicates) + if err != nil { + return err + } + + serviceEntryLog.Infof("ServiceEntryController (used to allocate VIP for Service Entry) registered") + return nil +} + +// Automatically allocates IPs for service entry services WITHOUT an address field when resolution is not NONE. +// The IPs are allocated from the reserved Class E subnet (240.240.0.0/16) that is not reachable outside the pod. +// When DNS capture is enabled, Envoy will resolve the DNS to these IPs. +// The listeners for TCP services will also be set up on these IPs. +func (c *serviceEntryController) autoAllocateIP(key client.ObjectKey, s *networking.ServiceEntry) { + if s.Spec.Resolution == istionapi.ServiceEntry_NONE { + return + } + + // Check whether the VIP conflicts with existing SEs if this service entry already has one + if len(s.Spec.Addresses) > 0 { + // leave it as it is if the VIP is not in the CIDR range "240.240.0.0/16" + if !strings.HasPrefix(s.Spec.Addresses[0], "240.240") { + return + } + + if existingKey, ok := c.serviceIPs[s.Spec.Addresses[0]]; ok { + // update the vip if it's conflicted with an existing ServiceEntry + if !reflect.DeepEqual(existingKey, key) { + err := c.Get(context.TODO(), existingKey, &networking.ServiceEntry{}) + if err == nil { + serviceEntryLog.Infof("update conflicting vip for serviceEntry %s", s) + s.Spec.Addresses[0] = c.nextAvailableIP() + c.updateServiceEntry(s, key) + } else if errors.IsNotFound(err) { + c.serviceIPs[s.Spec.Addresses[0]] = key + } else { + serviceEntryLog.Errorf("failed to get serviceEntry: %v", err) + } + } + } else { + // store the vip and serviceEntry in the map + c.serviceIPs[s.Spec.Addresses[0]] = key + } + } else { + s.Spec.Addresses = []string{c.nextAvailableIP()} + c.updateServiceEntry(s, key) + } +} + +func (c *serviceEntryController) updateServiceEntry(s *networking.ServiceEntry, key client.ObjectKey) { + err := c.Client.Update(context.TODO(), s, + &controllerclient.UpdateOptions{ + FieldManager: aerakiFieldManager, + }) + if err == nil { + c.serviceIPs[s.Spec.Addresses[0]] = key + serviceEntryLog.Infof("allocate vip for serviceEntry %v", model.Struct2JSON(s)) + } else { + serviceEntryLog.Errorf("failed to update serviceEntry %s, error: %v", s.Name, err) + } +} + +func (c *serviceEntryController) nextAvailableIP() string { + for { + nextIP := c.getNextIP() + key, exists := c.serviceIPs[nextIP] + if exists { + err := c.Get(context.TODO(), key, &networking.ServiceEntry{}) + if err != nil { + if errors.IsNotFound(err) { + delete(c.serviceIPs, nextIP) + return nextIP + } else { + serviceEntryLog.Errorf("failed to get serviceEntry: %v", err) + } + } + } else { + return nextIP + } + } +} + +func (c *serviceEntryController) getNextIP() string { + if c.maxIP%255 == 0 { + c.maxIP++ + } + if c.maxIP >= 255*255 { + serviceEntryLog.Errorf("out of IPs to allocate for service entries, restart from 0") + c.maxIP = 0 + } + thirdOctet := c.maxIP / 255 + fourthOctet := c.maxIP % 255 + c.maxIP++ + return fmt.Sprintf("240.240.%d.%d", thirdOctet, fourthOctet) +} diff --git a/pkg/controller/serviceetnry_test.go b/pkg/controller/serviceetnry_test.go new file mode 100644 index 000000000..13a8b4fac --- /dev/null +++ b/pkg/controller/serviceetnry_test.go @@ -0,0 +1,66 @@ +// Copyright Aeraki Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "testing" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestController_nextAvailableIP(t *testing.T) { + + c := &serviceEntryController{ + serviceIPs: make(map[string]client.ObjectKey), + maxIP: 0, + } + + if got := c.nextAvailableIP(); got != "240.240.0.1" { + t.Errorf("nextAvailableIP() = %v, want %v", got, "240.240.0.1") + } + c.serviceIPs["240.240.0.1"] = client.ObjectKey{ + Namespace: "test", + Name: "service1", + } + + if got := c.nextAvailableIP(); got != "240.240.0.2" { + t.Errorf("nextAvailableIP() = %v, want %v", got, "240.240.0.2") + } + c.serviceIPs["240.240.0.2"] = client.ObjectKey{ + Namespace: "test", + Name: "service2", + } + + if got := c.nextAvailableIP(); got != "240.240.0.3" { + t.Errorf("nextAvailableIP() = %v, want %v", got, "240.240.0.3") + } + c.serviceIPs["240.240.0.3"] = client.ObjectKey{ + Namespace: "test", + Name: "service3", + } + + c.maxIP = 255 + if got := c.nextAvailableIP(); got != "240.240.1.1" { + t.Errorf("nextAvailableIP() = %v, want %v", got, "240.240.1.1") + } + c.maxIP = 256 + if got := c.nextAvailableIP(); got != "240.240.1.1" { + t.Errorf("nextAvailableIP() = %v, want %v", got, "240.240.1.1") + } + c.maxIP = 255*254 + 100 + if got := c.nextAvailableIP(); got != "240.240.254.100" { + t.Errorf("nextAvailableIP() = %v, want %v", got, "240.240.254.100") + } +} diff --git a/pkg/kube/controller/controller.go b/pkg/kube/controller/controller.go index 45d7919d9..282d24c7d 100644 --- a/pkg/kube/controller/controller.go +++ b/pkg/kube/controller/controller.go @@ -23,10 +23,12 @@ import ( var controllerLog = log.RegisterScope("controller", "crd controller", 0) // NewManager create a manager to manager all crd controllers. -func NewManager(kubeConfig *rest.Config, namespace string) (manager.Manager, error) { +func NewManager(kubeConfig *rest.Config, namespace string, leaderElection bool, + leaderElectionID string) (manager.Manager, error) { mgrOpt := manager.Options{ MetricsBindAddress: "0", - LeaderElection: false, + LeaderElection: leaderElection, + LeaderElectionID: leaderElectionID, LeaderElectionNamespace: namespace, } m, err := manager.New(kubeConfig, mgrOpt) @@ -35,30 +37,5 @@ func NewManager(kubeConfig *rest.Config, namespace string) (manager.Manager, err } m.Elected() - - //err = addRedisServiceController(m, triggerPush) - //if err != nil { - // controllerLog.Fatalf("could not add RedisServiceController: %e", err) - //} - //err = addRedisDestinationController(m, triggerPush) - //if err != nil { - // controllerLog.Fatalf("could not add RedisDestinationController: %e", err) - //} - //err = addDubboAuthorizationPolicyController(m, triggerPush) - //if err != nil { - // controllerLog.Fatalf("could not add DubboAuthorizationPolicyController: %e", err) - //} - //err = addApplicationProtocolController(m, triggerPush) - //if err != nil { - // controllerLog.Fatalf("could not add ApplicationProtocolController: %e", err) - //} - //err = addMetaRouterController(m, triggerPush) - //if err != nil { - // controllerLog.Fatalf("could not add MetaRouterController: %e", err) - //} - //err = scheme.AddToScheme(m.GetScheme()) - //if err != nil { - // controllerLog.Fatalf("could not add schema: %e", err) - //} return m, nil }