Skip to content

Commit

Permalink
feat: Support status sync for Gateway API resources (#752)
Browse files Browse the repository at this point in the history
  • Loading branch information
CH3CHO authored Aug 15, 2024
1 parent 5b04201 commit 769da46
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 36 deletions.
76 changes: 71 additions & 5 deletions pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"istio.io/istio/pilot/pkg/server"
"istio.io/istio/pilot/pkg/serviceregistry/aggregate"
kubecontroller "istio.io/istio/pilot/pkg/serviceregistry/kube/controller"
"istio.io/istio/pilot/pkg/serviceregistry/serviceentry"
"istio.io/istio/pilot/pkg/xds"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config"
Expand Down Expand Up @@ -144,6 +145,7 @@ type Server struct {
multiclusterController *multicluster.Controller
configController model.ConfigStoreController
configStores []model.ConfigStoreController
serviceEntryController *serviceentry.Controller
httpServer *http.Server
httpMux *http.ServeMux
grpcServer *grpc.Server
Expand All @@ -156,6 +158,9 @@ type Server struct {
var (
PodNamespace = env.RegisterStringVar("POD_NAMESPACE", "higress-system", "").Get()
PodName = env.RegisterStringVar("POD_NAME", "", "").Get()
// Revision is the value of the Istio control plane revision, e.g. "canary",
// and is the value used by the "istio.io/rev" label.
Revision = env.Register("REVISION", "", "").Get()
)

func NewServer(args *ServerArgs) (*Server, error) {
Expand All @@ -179,7 +184,7 @@ func NewServer(args *ServerArgs) (*Server, error) {
s.initKubeClient,
s.initXdsServer,
s.initHttpServer,
s.initConfigController,
s.initControllers,
s.initRegistryEventHandlers,
s.initAuthenticators,
s.initAutomaticHttps,
Expand All @@ -203,6 +208,10 @@ func NewServer(args *ServerArgs) (*Server, error) {
return s, nil
}

func (s *Server) ServiceController() *aggregate.Controller {
return s.environment.ServiceDiscovery.(*aggregate.Controller)
}

// initRegistryEventHandlers sets up event handlers for config updates
func (s *Server) initRegistryEventHandlers() error {
log.Info("initializing registry event handlers")
Expand All @@ -226,15 +235,14 @@ func (s *Server) initRegistryEventHandlers() error {
return nil
}

func (s *Server) initConfigController() error {
ns := PodNamespace
func (s *Server) initControllers() error {
options := common.Options{
Enable: true,
ClusterId: s.RegistryOptions.KubeOptions.ClusterID,
IngressClass: s.IngressClass,
WatchNamespace: s.WatchNamespace,
EnableStatus: s.EnableStatus,
SystemNamespace: ns,
SystemNamespace: PodNamespace,
GatewaySelectorKey: s.GatewaySelectorKey,
GatewaySelectorValue: s.GatewaySelectorValue,
GatewayHttpPort: s.GatewayHttpPort,
Expand All @@ -246,8 +254,17 @@ func (s *Server) initConfigController() error {

s.initMulticluster(options)
s.initSDSServer(options)
if err := s.initConfigController(options); err != nil {
return fmt.Errorf("error initializing config controller: %v", err)
}
if err := s.initServiceControllers(options); err != nil {
return fmt.Errorf("error initializing service controllers: %v", err)
}
return nil
}

ingressConfig := translation.NewIngressTranslation(s.kubeClient, s.xdsServer, ns, options.ClusterId)
func (s *Server) initConfigController(options common.Options) error {
ingressConfig := translation.NewIngressTranslation(s.environment, s.kubeClient, s.xdsServer, options.SystemNamespace, options.ClusterId)
ingressConfig.AddLocalCluster(options)

s.configStores = append(s.configStores, ingressConfig)
Expand All @@ -269,6 +286,7 @@ func (s *Server) initConfigController() error {
go s.configController.Run(stop)
return nil
})

return nil
}

Expand Down Expand Up @@ -297,6 +315,54 @@ func (s *Server) initSDSServer(options common.Options) {
s.environment.CredentialsController = creds
}

func (s *Server) initServiceControllers(options common.Options) error {
serviceControllers := s.ServiceController()

s.serviceEntryController = serviceentry.NewController(
s.configController, s.xdsServer,
serviceentry.WithClusterID(s.RegistryOptions.KubeOptions.ClusterID),
)
serviceControllers.AddRegistry(s.serviceEntryController)

if err := s.initKubeRegistry(options); err != nil {
return err
}

// Defer running of the service controllers.
s.server.RunComponent("service controllers", func(stop <-chan struct{}) error {
go serviceControllers.Run(stop)
return nil
})

return nil
}

// initKubeRegistry creates all the k8s service controllers under this pilot
func (s *Server) initKubeRegistry(options common.Options) (err error) {
s.RegistryOptions.KubeOptions.ClusterID = options.ClusterId
s.RegistryOptions.KubeOptions.Metrics = s.environment
s.RegistryOptions.KubeOptions.XDSUpdater = s.xdsServer
s.RegistryOptions.KubeOptions.MeshNetworksWatcher = s.environment.NetworksWatcher
s.RegistryOptions.KubeOptions.MeshWatcher = s.environment.Watcher
s.RegistryOptions.KubeOptions.SystemNamespace = options.SystemNamespace
s.RegistryOptions.KubeOptions.MeshServiceController = s.ServiceController()
// pass namespace to k8s service registry
s.RegistryOptions.KubeOptions.DiscoveryNamespacesFilter = s.multiclusterController.DiscoveryNamespacesFilter
s.multiclusterController.AddHandler(kubecontroller.NewMulticluster(PodName,
s.kubeClient.Kube(),
s.RegistryOptions.ClusterRegistriesNamespace,
s.RegistryOptions.KubeOptions,
s.serviceEntryController,
s.configController,
//s.istiodCertBundleWatcher,
nil,
Revision,
false,
s.environment.ClusterLocal(),
s.server))

return
}
func (s *Server) Start(stop <-chan struct{}) error {
if err := s.multiclusterController.Run(stop); err != nil {
return err
Expand Down
7 changes: 5 additions & 2 deletions pkg/ingress/config/ingress_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type IngressConfig struct {
ingressRouteCache istiomodel.IngressRouteCollection
ingressDomainCache istiomodel.IngressDomainCollection

environment *istiomodel.Environment
localKubeClient kube.Client

virtualServiceHandlers []istiomodel.EventHandler
Expand Down Expand Up @@ -155,13 +156,15 @@ type IngressConfig struct {
httpsConfigMgr *cert.ConfigMgr
}

func NewIngressConfig(localKubeClient kube.Client, xdsUpdater istiomodel.XDSUpdater, namespace string, clusterId cluster.ID) *IngressConfig {
func NewIngressConfig(environment *istiomodel.Environment, localKubeClient kube.Client, xdsUpdater istiomodel.XDSUpdater,
namespace string, clusterId cluster.ID) *IngressConfig {
if clusterId == "Kubernetes" {
clusterId = ""
}
config := &IngressConfig{
remoteIngressControllers: make(map[cluster.ID]common.IngressController),
remoteGatewayControllers: make(map[cluster.ID]common.GatewayController),
environment: environment,
localKubeClient: localKubeClient,
XDSUpdater: xdsUpdater,
annotationHandler: annotations.NewAnnotationHandlerManager(),
Expand Down Expand Up @@ -240,7 +243,7 @@ func (m *IngressConfig) AddLocalCluster(options common.Options) {
}
m.remoteIngressControllers[options.ClusterId] = ingressController

m.remoteGatewayControllers[options.ClusterId] = gateway.NewController(m.localKubeClient, options)
m.remoteGatewayControllers[options.ClusterId] = gateway.NewController(m.environment, m.localKubeClient, options)
}

func (m *IngressConfig) List(typ config.GroupVersionKind, namespace string) []config.Config {
Expand Down
42 changes: 30 additions & 12 deletions pkg/ingress/kube/gateway/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
kubecredentials "istio.io/istio/pilot/pkg/credentials/kube"
"istio.io/istio/pilot/pkg/model"
kubecontroller "istio.io/istio/pilot/pkg/serviceregistry/kube/controller"
"istio.io/istio/pilot/pkg/status"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/schema/collection"
Expand All @@ -45,14 +46,16 @@ type gatewayController struct {
destinationRuleHandlers []model.EventHandler
envoyFilterHandlers []model.EventHandler

environment *model.Environment
store model.ConfigStoreController
credsController credentials.MulticlusterController
istioController *istiogateway.Controller
statusManager *status.Manager

resourceUpToDate atomic.Bool
}

func NewController(client kube.Client, options common.Options) common.GatewayController {
func NewController(environment *model.Environment, client kube.Client, options common.Options) common.GatewayController {
domainSuffix := util.GetDomainSuffix()
opts := crdclient.Option{
Revision: bootstrap.Revision,
Expand All @@ -71,22 +74,25 @@ func NewController(client kube.Client, options common.Options) common.GatewayCon
clusterId := options.ClusterId
credsController := kubecredentials.NewMulticluster(clusterId)
credsController.ClusterAdded(&multicluster.Cluster{ID: clusterId, Client: client}, nil)
istioController := istiogateway.NewController(client, store, client.CrdWatcher().WaitForCRD, credsController, kubecontroller.Options{DomainSuffix: domainSuffix})
istioController := istiogateway.NewController(environment, client, store, client.CrdWatcher().WaitForCRD, credsController, kubecontroller.Options{DomainSuffix: domainSuffix})
if options.GatewaySelectorKey != "" {
istioController.DefaultGatewaySelector = map[string]string{options.GatewaySelectorKey: options.GatewaySelectorValue}
}

var statusManager *status.Manager = nil
if options.EnableStatus {
// TODO: Add status sync support
//istioController.SetStatusWrite(true,)
statusManager = status.NewManager(store)
istioController.SetStatusWrite(true, statusManager)
} else {
IngressLog.Infof("Disable status update for cluster %s", clusterId)
}

return &gatewayController{
environment: environment,
store: store,
credsController: credsController,
istioController: istioController,
statusManager: statusManager,
}
}

Expand All @@ -100,10 +106,7 @@ func (g *gatewayController) Get(typ config.GroupVersionKind, name, namespace str

func (g *gatewayController) List(typ config.GroupVersionKind, namespace string) []config.Config {
if g.resourceUpToDate.CompareAndSwap(false, true) {
err := g.istioController.Reconcile(model.NewPushContext())
if err != nil {
IngressLog.Errorf("failed to recompute Gateway API resources: %v", err)
}
_ = g.reconcile()
}
return g.istioController.List(typ, namespace)
}
Expand Down Expand Up @@ -148,6 +151,9 @@ func (g *gatewayController) Run(stop <-chan struct{}) {
})
go g.store.Run(stop)
go g.istioController.Run(stop)
if g.statusManager != nil {
g.statusManager.Start(stop)
}
}

func (g *gatewayController) SetWatchErrorHandler(f func(r *cache.Reflector, err error)) error {
Expand All @@ -158,14 +164,26 @@ func (g *gatewayController) SetWatchErrorHandler(f func(r *cache.Reflector, err
func (g *gatewayController) HasSynced() bool {
ret := g.istioController.HasSynced()
if ret {
err := g.istioController.Reconcile(model.NewPushContext())
if err != nil {
IngressLog.Errorf("failed to recompute Gateway API resources: %v", err)
}
_ = g.reconcile()
}
return ret
}

func (g *gatewayController) reconcile() error {
//ps := model.NewPushContext()
//if err := ps.InitContext(g.environment, nil, nil); err != nil {
// IngressLog.Errorf("failed to init PushContext. use empty PushContext instead: %v", err)
//}
err := g.istioController.Reconcile(model.NewPushContext())
if err != nil {
IngressLog.Errorf("failed to recompute Gateway API resources: %v", err)
}
return err
}

//func (g *gatewayController) initPushContext(ps *model.PushContext) error {
//}

func (g *gatewayController) onEvent(prev config.Config, curr config.Config, event model.Event) {
g.resourceUpToDate.Store(false)

Expand Down
45 changes: 38 additions & 7 deletions pkg/ingress/kube/gateway/istio/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Updated based on Istio codebase by Higress

package istio

import (
Expand All @@ -32,10 +34,11 @@ import (
// GatewayContext contains a minimal subset of push context functionality to be exposed to GatewayAPIControllers
type GatewayContext struct {
ps *model.PushContext
si *serviceIndex
}

func NewGatewayContext(ps *model.PushContext) GatewayContext {
return GatewayContext{ps}
func NewGatewayContext(ps *model.PushContext, si *serviceIndex) GatewayContext {
return GatewayContext{ps, si}
}

// ResolveGatewayInstances attempts to resolve all instances that a gateway will be exposed on.
Expand All @@ -50,6 +53,9 @@ func NewGatewayContext(ps *model.PushContext) GatewayContext {
func (gc GatewayContext) ResolveGatewayInstances(
namespace string,
gwsvcs []string,
// Start - Updated by Higress
gatewaySelector map[string]string,
// End - Updated by Higress
servers []*networking.Server,
) (internal, external, pending, warns []string) {
ports := map[int]struct{}{}
Expand All @@ -60,11 +66,30 @@ func (gc GatewayContext) ResolveGatewayInstances(
foundExternal := sets.New[string]()
foundPending := sets.New[string]()
warnings := []string{}
// Start - Added by Higress
if gatewaySelector != nil && len(gatewaySelector) != 0 {
gwsvcs = append([]string{}, gwsvcs...)
for _, svc := range gc.si.all {
matches := true
for k, v := range gatewaySelector {
if svc.Attributes.Labels[k] != v {
matches = false
break
}
}
if matches {
gwsvcs = append(gwsvcs, string(svc.Hostname))
}
}
}
// End - Added by Higress
for _, g := range gwsvcs {
svc, f := gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(g)][namespace]
// Start - Updated by Higress
svc, f := gc.si.HostnameAndNamespace[host.Name(g)][namespace]
// End - Updated by Higress
if !f {
otherNamespaces := []string{}
for ns := range gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(g)] {
for ns := range gc.si.HostnameAndNamespace[host.Name(g)] {
otherNamespaces = append(otherNamespaces, `"`+ns+`"`) // Wrap in quotes for output
}
if len(otherNamespaces) > 0 {
Expand All @@ -78,7 +103,9 @@ func (gc GatewayContext) ResolveGatewayInstances(
}
svcKey := svc.Key()
for port := range ports {
instances := gc.ps.ServiceInstancesByPort(svc, port, nil)
// Start - Updated by Higress
instances := gc.si.ServiceInstancesByPort(svc, port, nil)
// End - Updated by Higress
if len(instances) > 0 {
foundInternal.Insert(fmt.Sprintf("%s:%d", g, port))
if svc.Attributes.ClusterExternalAddresses.Len() > 0 {
Expand All @@ -93,7 +120,9 @@ func (gc GatewayContext) ResolveGatewayInstances(
}
}
} else {
instancesByPort := gc.ps.ServiceInstances(svcKey)
// Start - Updated by Higress
instancesByPort := gc.si.ServiceInstances(svcKey)
// End - Updated by Higress
if instancesEmpty(instancesByPort) {
warnings = append(warnings, fmt.Sprintf("no instances found for hostname %q", g))
} else {
Expand Down Expand Up @@ -121,7 +150,9 @@ func (gc GatewayContext) ResolveGatewayInstances(
}

func (gc GatewayContext) GetService(hostname, namespace string) *model.Service {
return gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(hostname)][namespace]
// Start - Updated by Higress
return gc.si.HostnameAndNamespace[host.Name(hostname)][namespace]
// End - Updated by Higress
}

func instancesEmpty(m map[int][]*model.ServiceInstance) bool {
Expand Down
Loading

0 comments on commit 769da46

Please sign in to comment.