Skip to content

Commit

Permalink
refactor service entry controller
Browse files Browse the repository at this point in the history
Use controller runtime api to rewrite service entry controller.

Signed-off-by: Xunzhuo <mixdeers@gmail.com>
  • Loading branch information
Xunzhuo committed May 25, 2022
1 parent cabd47e commit c777921
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 81 deletions.
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSY
github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk=
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE=
github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc=
Expand Down Expand Up @@ -351,7 +350,6 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=
github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
Expand Down Expand Up @@ -394,7 +392,6 @@ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dvyukov/go-fuzz v0.0.0-20210914135545-4980593459a1/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down Expand Up @@ -1044,7 +1041,6 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw=
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
Expand Down Expand Up @@ -1117,7 +1113,6 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v0.0.0-20180303142811-b89eecf5ca5d/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down Expand Up @@ -1845,7 +1840,6 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0=
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o=
helm.sh/helm/v3 v3.7.1/go.mod h1:3eOeBD3Z+O/ELiuu19zynZSN8jP1ErXLuyP21SZeMq8=
Expand Down
120 changes: 72 additions & 48 deletions pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (

"github.com/aeraki-mesh/aeraki/pkg/leaderelection"

"github.com/aeraki-mesh/aeraki/client-go/pkg/clientset/versioned/scheme"
aerakischeme "github.com/aeraki-mesh/aeraki/client-go/pkg/clientset/versioned/scheme"
"github.com/aeraki-mesh/aeraki/pkg/config"
"github.com/aeraki-mesh/aeraki/pkg/config/serviceentry"
"github.com/aeraki-mesh/aeraki/pkg/controller"
"github.com/aeraki-mesh/aeraki/pkg/envoyfilter"
"github.com/aeraki-mesh/aeraki/pkg/kube/controller"
"github.com/aeraki-mesh/aeraki/pkg/model/protocol"
"github.com/aeraki-mesh/aeraki/pkg/xds"
"github.com/aeraki-mesh/aeraki/plugin/dubbo"
"github.com/aeraki-mesh/aeraki/plugin/redis"
istioscheme "istio.io/client-go/pkg/apis/networking/v1alpha3"
"istio.io/client-go/pkg/clientset/versioned"
"istio.io/istio/pilot/pkg/model"
istioconfig "istio.io/istio/pkg/config"
Expand All @@ -48,15 +48,15 @@ var (

// Server contains the runtime configuration for the Aeraki service.
type Server struct {
args *AerakiArgs
kubeClient kubernetes.Interface
configController *config.Controller
serviceEntryController *serviceentry.Controller
envoyFilterController *envoyfilter.Controller
xdsCacheMgr *xds.CacheMgr
xdsServer *xds.Server
crdCtrlMgr manager.Manager
stopCRDController func()
args *AerakiArgs
kubeClient kubernetes.Interface
configController *config.Controller
envoyFilterController *envoyfilter.Controller
xdsCacheMgr *xds.CacheMgr
xdsServer *xds.Server
scalableCtrlMgr manager.Manager
singletonCtrlMgr manager.Manager
stopCRDController func()
}

// NewServer creates a new Server instance based on the provided arguments.
Expand All @@ -77,12 +77,6 @@ func NewServer(args *AerakiArgs) (*Server, error) {
NameSpace: args.Namespace,
})

// Istio can allocate a VIP for a serviceentry, but the IPs are allocated in a sidecar scope, hence the IP of a
// service is not consistent across sidecar border.
// Since Aeraki is using the VIP of a serviceEntry as match condition when generating EnvoyFilter,
// the VIP must be unique and consistent in the mesh.
serviceEntryController := serviceentry.NewController(client)

// envoyFilterController watches changes on config and create/update corresponding EnvoyFilters
envoyFilterController := envoyfilter.NewController(client, configController.Store, args.Protocols,
args.EnableEnvoyFilterNSScope)
Expand All @@ -101,36 +95,42 @@ func NewServer(args *AerakiArgs) (*Server, error) {
xdsServer := xds.NewServer(args.XdsAddr, routeCacheMgr)

// crdCtrlMgr watches Aeraki CRDs, such as MetaRouter, ApplicationProtocol, etc.
crdCtrlMgr, err := createCrdControllers(args, kubeConfig, envoyFilterController, routeCacheMgr)
scalableCtrlMgr, err := createScalableControllers(args, kubeConfig, envoyFilterController, routeCacheMgr)
if err != nil {
return nil, err
}
// routeCacheMgr uses controller manager client to get route configuration in MetaRouters
routeCacheMgr.MetaRouterControllerClient = crdCtrlMgr.GetClient()
routeCacheMgr.MetaRouterControllerClient = scalableCtrlMgr.GetClient()
// envoyFilterController uses controller manager client to get the rate limit configuration in MetaRouters
envoyFilterController.MetaRouterControllerClient = crdCtrlMgr.GetClient()
envoyFilterController.MetaRouterControllerClient = scalableCtrlMgr.GetClient()

//todo replace config with cached client
cfg := crdCtrlMgr.GetConfig()
args.Protocols[protocol.Dubbo] = dubbo.NewGenerator(crdCtrlMgr.GetConfig())
cfg := scalableCtrlMgr.GetConfig()
args.Protocols[protocol.Dubbo] = dubbo.NewGenerator(scalableCtrlMgr.GetConfig())
args.Protocols[protocol.Redis] = redis.New(cfg, configController.Store)

// singletonCtrlMgr
singletonCtrlMgr, err := createSingletonControllers(args, kubeConfig)
if err != nil {
return nil, err
}
server := &Server{
args: args,
configController: configController,
envoyFilterController: envoyFilterController,
crdCtrlMgr: crdCtrlMgr,
serviceEntryController: serviceEntryController,
xdsCacheMgr: routeCacheMgr,
xdsServer: xdsServer,
args: args,
configController: configController,
envoyFilterController: envoyFilterController,
scalableCtrlMgr: scalableCtrlMgr,
singletonCtrlMgr: singletonCtrlMgr,
xdsCacheMgr: routeCacheMgr,
xdsServer: xdsServer,
}
err = server.initKubeClient()
return server, err
}

func createCrdControllers(args *AerakiArgs, kubeConfig *rest.Config,
// These controllers are horizontally scalable, multiple instances can be deployed to share the load
func createScalableControllers(args *AerakiArgs, kubeConfig *rest.Config,
envoyFilterController *envoyfilter.Controller, xdsCacheMgr *xds.CacheMgr) (manager.Manager, error) {
crdCtrlMgr, err := controller.NewManager(kubeConfig, args.Namespace)
mgr, err := controller.NewManager(kubeConfig, args.Namespace, false, "")
if err != nil {
return nil, err
}
Expand All @@ -143,23 +143,23 @@ func createCrdControllers(args *AerakiArgs, kubeConfig *rest.Config,
xdsCacheMgr.UpdateRoute()
return nil
}
err = controller.AddRedisServiceController(crdCtrlMgr, updateEnvoyFilter)
err = controller.AddRedisServiceController(mgr, updateEnvoyFilter)
if err != nil {
aerakiLog.Fatalf("could not add RedisServiceController: %e", err)
}
err = controller.AddRedisDestinationController(crdCtrlMgr, updateEnvoyFilter)
err = controller.AddRedisDestinationController(mgr, updateEnvoyFilter)
if err != nil {
aerakiLog.Fatalf("could not add RedisDestinationController: %e", err)
}
err = controller.AddDubboAuthorizationPolicyController(crdCtrlMgr, updateEnvoyFilter)
err = controller.AddDubboAuthorizationPolicyController(mgr, updateEnvoyFilter)
if err != nil {
aerakiLog.Fatalf("could not add DubboAuthorizationPolicyController: %e", err)
}
err = controller.AddApplicationProtocolController(crdCtrlMgr, updateEnvoyFilter)
err = controller.AddApplicationProtocolController(mgr, updateEnvoyFilter)
if err != nil {
aerakiLog.Fatalf("could not add ApplicationProtocolController: %e", err)
}
err = controller.AddMetaRouterController(crdCtrlMgr, func() error {
err = controller.AddMetaRouterController(mgr, func() error {
if err := updateEnvoyFilter(); err != nil { //MetaRouter Rate limit config will cause update on EnvoyFilters
return err
}
Expand All @@ -171,11 +171,34 @@ func createCrdControllers(args *AerakiArgs, kubeConfig *rest.Config,
if err != nil {
aerakiLog.Fatalf("could not add MetaRouterController: %e", err)
}
err = scheme.AddToScheme(crdCtrlMgr.GetScheme())
err = aerakischeme.AddToScheme(mgr.GetScheme())
if err != nil {
aerakiLog.Fatalf("could not add schema: %e", err)
}
return mgr, nil
}

// The Service Entry Controller is used to assign a globally unique VIP to a service entry,
// hence only one instance can get the lock to run
//
// Istio can allocate a VIP for a serviceentry, but the IPs are allocated in a sidecar scope, hence the IP of a
// service is not consistent across sidecar border.
// Since Aeraki is using the VIP of a serviceEntry as match condition when generating EnvoyFilter,
// the VIP must be unique and consistent in the mesh.
func createSingletonControllers(args *AerakiArgs, kubeConfig *rest.Config) (manager.Manager, error) {
mgr, err := controller.NewManager(kubeConfig, args.Namespace, true, leaderelection.AllocateVIPController)
if err != nil {
return nil, err
}
err = controller.AddserviceEntryController(mgr)
if err != nil {
aerakiLog.Fatalf("could not add ServiceEntryController: %e", err)
}
err = istioscheme.AddToScheme(mgr.GetScheme())
if err != nil {
aerakiLog.Fatalf("could not add schema: %e", err)
}
return crdCtrlMgr, nil
return mgr, nil
}

// Start starts all components of the Aeraki service. Serving can be canceled at any time by closing the provided stop channel.
Expand All @@ -194,14 +217,6 @@ func (s *Server) Start(stop <-chan struct{}) {
s.envoyFilterController.Run(stop)
}).Run(stop)
}()
go func() {
leaderelection.
NewLeaderElection(s.args.Namespace, s.args.ServerID, leaderelection.AllocateVIPController, s.kubeClient).
AddRunFunction(func(leaderStop <-chan struct{}) {
aerakiLog.Infof("starting ServiceEntry IP allocation controller")
s.serviceEntryController.Run(stop)
}).Run(stop)
}()
} else {
aerakiLog.Infof("aeraki is running as a slave, only xds server will be started")
}
Expand All @@ -223,7 +238,16 @@ func (s *Server) Start(stop <-chan struct{}) {
ctx, cancel := context.WithCancel(context.Background())
s.stopCRDController = cancel
go func() {
_ = s.crdCtrlMgr.Start(ctx)
err := s.scalableCtrlMgr.Start(ctx)
if err != nil {
aerakiLog.Errorf("failed to start controllers: %v", err)
}
}()
go func() {
err := s.singletonCtrlMgr.Start(ctx)
if err != nil {
aerakiLog.Errorf("failed to start controllers: %v", err)
}
}()

s.waitForShutdown(stop)
Expand Down
Loading

0 comments on commit c777921

Please sign in to comment.