Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for EndpointSlices API in master #959

Merged
merged 1 commit into from
May 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ linters:
- govet
- ineffassign
- misspell
- revive
- staticcheck
- structcheck
- typecheck
Expand Down
9 changes: 9 additions & 0 deletions docs/content/en/docs/configuration/command-line.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ The following command-line options are supported:
| [`--sort-backends`](#sort-backends) | [true\|false] | `false` | |
| [`--shutdown-timeout`](#shutdown-timeout) | time | `25s` | v0.15 |
| [`--sort-endpoints-by`](#sort-endpoints-by) | [endpoint\|ip\|name\|random] | `endpoint` | v0.11 |
| [`--enable-endpointslices-api`](#enable-endpointslices-api) | [true\|false] | `false` | v0.14 |
| [`--stats-collect-processing-period`](#stats) | time | `500ms` | v0.10 |
| [`--stop-handler`](#stats) | [true\|false] | `false` | v0.15 |
| [`--sync-period`](#sync-period) | time | `10m` | |
Expand Down Expand Up @@ -507,6 +508,14 @@ Defines in which order the endpoints of a backend should be sorted.

---

## --enable-endpointslices-api

Since v0.14

Uses EndpointSlices API info, rather than Endpoints API, to fetch service endpoints info. By default it is disabled. EndpointSlices API was stablised from Kubernetes v1.21.

---

## Stats

Configures an endpoint with statistics, debugging and health checks. The following URIs are provided:
Expand Down
5 changes: 3 additions & 2 deletions pkg/common/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ type Configuration struct {
ElectionID string
UpdateStatusOnShutdown bool

BackendShards int
SortEndpointsBy string
BackendShards int
SortEndpointsBy string
EnableEndpointSlicesAPI bool
}

// newIngressController creates an Ingress controller
Expand Down
10 changes: 10 additions & 0 deletions pkg/common/ingress/controller/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ Actually node listing isn't needed and it is always disabled`)
`DEPRECATED, this option is ignored. Use --watch-ingress-without-class
command-line option instead to define if ingress without class should be
tracked.`)

enableEndpointSlicesAPI = flags.Bool("enable-endpointslices-api", false,
`Enables EndpointSlices API and disables watching Endpoints API. Only enable
in
k8s >=1.21+`)
)

logLevel := new(klog.Level)
Expand Down Expand Up @@ -380,6 +385,10 @@ tracked.`)
klog.Info("running embedded haproxy, mode is daemon")
}

if *enableEndpointSlicesAPI {
klog.Infof("watching endpointslices - --enable-endpointslices-api is true")
}

if !(*reloadStrategy == "native" || *reloadStrategy == "reusesocket" || *reloadStrategy == "multibinder") {
klog.Exitf("Unsupported reload strategy: %v", *reloadStrategy)
}
Expand Down Expand Up @@ -575,6 +584,7 @@ tracked.`)
BackendShards: *backendShards,
SortEndpointsBy: sortEndpoints,
UseNodeInternalIP: *useNodeInternalIP,
EnableEndpointSlicesAPI: *enableEndpointSlicesAPI,
}

ic := newIngressController(config)
Expand Down
16 changes: 16 additions & 0 deletions pkg/controller/legacy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"time"

api "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networking "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -143,10 +144,20 @@ func createCache(
cfg.ForceNamespaceIsolation,
!cfg.DisablePodList,
cfg.ResyncPeriod,
cfg.EnableEndpointSlicesAPI,
)
return cache
}

func (c *k8scache) GetEndpointSlices(service *api.Service) ([]*discoveryv1.EndpointSlice, error) {
serviceNameLabel := map[string]string{"kubernetes.io/service-name": service.Name}
selector, err := buildLabelSelector(serviceNameLabel)
if err != nil {
return nil, err
}
return c.listers.endpointSliceLister.EndpointSlices(service.Namespace).List(selector)
}

func (c *k8scache) RunAsync(stopCh <-chan struct{}) {
c.listers.RunAsync(stopCh)
}
Expand Down Expand Up @@ -840,6 +851,8 @@ func (c *k8scache) Notify(old, cur interface{}) {
ch.NeedFullSync = true
case *api.Endpoints:
ch.EndpointsNew = append(ch.EndpointsNew, cur)
case *discoveryv1.EndpointSlice:
ch.EndpointSlicesUpd = append(ch.EndpointSlicesUpd, cur)
case *api.Service:
svc := cur
if old == nil {
Expand Down Expand Up @@ -962,6 +975,9 @@ func (c *k8scache) SwapChangedObjects() *convtypes.ChangedObjects {
for _, ep := range ch.EndpointsNew {
addChanges(convtypes.ResourceEndpoints, eventUpdate, ep.Namespace, ep.Name)
}
for _, eps := range ch.EndpointSlicesUpd {
addChanges(convtypes.ResourceEndpoints, eventUpdate, eps.Namespace, eps.Labels["kubernetes.io/service-name"])
}
lafolle marked this conversation as resolved.
Show resolved Hide resolved
for _, svc := range ch.ServicesDel {
addChanges(convtypes.ResourceService, eventDel, svc.Namespace, svc.Name)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/legacy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (hc *HAProxyController) configController() {
AcmeTrackTLSAnn: hc.cfg.AcmeTrackTLSAnn,
TrackInstances: hc.cfg.TrackOldInstances,
HasGateway: hc.cache.hasGateway(),
EnableEPSlices: hc.cfg.EnableEndpointSlicesAPI,
}
}

Expand Down
126 changes: 89 additions & 37 deletions pkg/controller/legacy/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ import (
"time"

api "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networking "k8s.io/api/networking/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
discovery "k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
informerscore "k8s.io/client-go/informers/core/v1"
informersdiscovery "k8s.io/client-go/informers/discovery/v1"
informersnetworking "k8s.io/client-go/informers/networking/v1"
"k8s.io/client-go/kubernetes/fake"
listerscore "k8s.io/client-go/listers/core/v1"
listersdiscovery "k8s.io/client-go/listers/discovery/v1"
listersnetworking "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand All @@ -53,34 +56,37 @@ type ListerEvents interface {
}

type listers struct {
events ListerEvents
logger types.Logger
recorder record.EventRecorder
running bool
events ListerEvents
logger types.Logger
recorder record.EventRecorder
running bool
enableEndpointSlicesAPI bool
//
hasPodLister bool
//
ingressLister listersnetworking.IngressLister
ingressClassLister listersnetworking.IngressClassLister
gatewayLister gwapilistersgatewayv1alpha2.GatewayLister
gatewayClassLister gwapilistersgatewayv1alpha2.GatewayClassLister
httpRouteLister gwapilistersgatewayv1alpha2.HTTPRouteLister
endpointLister listerscore.EndpointsLister
serviceLister listerscore.ServiceLister
secretLister listerscore.SecretLister
configMapLister listerscore.ConfigMapLister
podLister listerscore.PodLister
ingressLister listersnetworking.IngressLister
ingressClassLister listersnetworking.IngressClassLister
gatewayLister gwapilistersgatewayv1alpha2.GatewayLister
gatewayClassLister gwapilistersgatewayv1alpha2.GatewayClassLister
httpRouteLister gwapilistersgatewayv1alpha2.HTTPRouteLister
endpointLister listerscore.EndpointsLister
endpointSliceLister listersdiscovery.EndpointSliceLister
serviceLister listerscore.ServiceLister
secretLister listerscore.SecretLister
configMapLister listerscore.ConfigMapLister
podLister listerscore.PodLister
//
ingressInformer cache.SharedInformer
ingressClassInformer cache.SharedInformer
gatewayInformer cache.SharedInformer
gatewayClassInformer cache.SharedInformer
httpRouteInformer cache.SharedInformer
endpointInformer cache.SharedInformer
serviceInformer cache.SharedInformer
secretInformer cache.SharedInformer
configMapInformer cache.SharedInformer
podInformer cache.SharedInformer
ingressInformer cache.SharedInformer
ingressClassInformer cache.SharedInformer
gatewayInformer cache.SharedInformer
gatewayClassInformer cache.SharedInformer
httpRouteInformer cache.SharedInformer
endpointInformer cache.SharedInformer
endpointSliceInformer cache.SharedInformer
serviceInformer cache.SharedInformer
secretInformer cache.SharedInformer
configMapInformer cache.SharedInformer
podInformer cache.SharedInformer
}

func createListers(
Expand All @@ -93,6 +99,7 @@ func createListers(
isolateNamespace bool,
podWatch bool,
resync time.Duration,
enableEndpointSlicesAPI bool,
) *listers {
clusterWatch := watchNamespace == api.NamespaceAll
clusterOption := informers.WithTweakListOptions(nil)
Expand All @@ -112,13 +119,18 @@ func createListers(
localInformer = informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)
}
l := &listers{
events: events,
recorder: recorder,
logger: logger,
events: events,
recorder: recorder,
logger: logger,
enableEndpointSlicesAPI: enableEndpointSlicesAPI,
}
l.createIngressLister(ingressInformer.Networking().V1().Ingresses())
l.createIngressClassLister(ingressInformer.Networking().V1().IngressClasses())
l.createEndpointLister(resourceInformer.Core().V1().Endpoints())
if enableEndpointSlicesAPI {
l.createEndpointSliceLister(resourceInformer.Discovery().V1().EndpointSlices())
} else {
l.createEndpointLister(resourceInformer.Core().V1().Endpoints())
}
l.createServiceLister(resourceInformer.Core().V1().Services())
l.createSecretLister(resourceInformer.Core().V1().Secrets())
l.createConfigMapLister(resourceInformer.Core().V1().ConfigMaps())
Expand Down Expand Up @@ -200,19 +212,33 @@ func (l *listers) RunAsync(stopCh <-chan struct{}) {

// initialize listers and informers
go l.ingressInformer.Run(stopCh)
go l.endpointInformer.Run(stopCh)
go l.serviceInformer.Run(stopCh)
go l.secretInformer.Run(stopCh)
go l.configMapInformer.Run(stopCh)
go l.podInformer.Run(stopCh)
synced := cache.WaitForCacheSync(stopCh,
l.ingressInformer.HasSynced,
l.endpointInformer.HasSynced,
l.serviceInformer.HasSynced,
l.secretInformer.HasSynced,
l.configMapInformer.HasSynced,
l.podInformer.HasSynced,
)
var synced bool
if l.enableEndpointSlicesAPI {
go l.endpointSliceInformer.Run(stopCh)
synced = cache.WaitForCacheSync(stopCh,
l.ingressInformer.HasSynced,
l.endpointSliceInformer.HasSynced,
l.serviceInformer.HasSynced,
l.secretInformer.HasSynced,
l.configMapInformer.HasSynced,
l.podInformer.HasSynced,
)

} else {
go l.endpointInformer.Run(stopCh)
synced = cache.WaitForCacheSync(stopCh,
l.ingressInformer.HasSynced,
l.endpointInformer.HasSynced,
l.serviceInformer.HasSynced,
l.secretInformer.HasSynced,
l.configMapInformer.HasSynced,
l.podInformer.HasSynced,
)
}
if synced {
l.logger.Info("cache successfully synced")
l.running = true
Expand Down Expand Up @@ -401,6 +427,32 @@ func (l *listers) createHTTPRouteLister(informer gwapiinformersgatewayv1alpha2.H
})
}

func (l *listers) createEndpointSliceLister(informer informersdiscovery.EndpointSliceInformer) {
l.endpointSliceLister = informer.Lister()
l.endpointSliceInformer = informer.Informer()
l.endpointSliceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{

// A new EndpointSlice shows up
AddFunc: func(obj interface{}) {
l.events.Notify(nil, obj)
},

// Existing EndpointSlice is updated
UpdateFunc: func(old, cur interface{}) {
oldEPSlice := old.(*discoveryv1.EndpointSlice)
curEPSlice := cur.(*discoveryv1.EndpointSlice)
if !reflect.DeepEqual(oldEPSlice.Endpoints, curEPSlice.Endpoints) {
l.events.Notify(oldEPSlice, curEPSlice)
}
},

// Existing EndpointSlice is deleted
DeleteFunc: func(obj interface{}) {
l.events.Notify(obj, nil)
},
})
}

func (l *listers) createEndpointLister(informer informerscore.EndpointsInformer) {
l.endpointLister = informer.Lister()
l.endpointInformer = informer.Informer()
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/services/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/go-logr/logr"
api "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -287,6 +288,12 @@ func (c *c) GetService(defaultNamespace, serviceName string) (*api.Service, erro

}

func (c *c) GetEndpointSlices(service *api.Service) ([]*discoveryv1.EndpointSlice, error) {
// TODO: endpoint slices to be implemented for new controller runtime. For now
// only exists in legacy controller.
return nil, nil
}

func (c *c) GetEndpoints(service *api.Service) (*api.Endpoints, error) {
ep := api.Endpoints{}
err := c.client.Get(c.ctx, types.NamespacedName{
Expand Down
20 changes: 11 additions & 9 deletions pkg/converters/configmap/tcpservices.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,20 @@ type TCPServicesConverter interface {
// NewTCPServicesConverter ...
func NewTCPServicesConverter(options *convtypes.ConverterOptions, haproxy haproxy.Config, changed *convtypes.ChangedObjects) TCPServicesConverter {
return &tcpSvcConverter{
logger: options.Logger,
cache: options.Cache,
haproxy: haproxy,
changed: changed,
logger: options.Logger,
cache: options.Cache,
haproxy: haproxy,
changed: changed,
enableEndpointSlicesAPI: options.EnableEPSlices,
}
}

type tcpSvcConverter struct {
logger types.Logger
cache convtypes.Cache
haproxy haproxy.Config
changed *convtypes.ChangedObjects
logger types.Logger
cache convtypes.Cache
haproxy haproxy.Config
changed *convtypes.ChangedObjects
enableEndpointSlicesAPI bool
}

var regexValidTime = regexp.MustCompile(`^[0-9]+(us|ms|s|m|h|d)$`)
Expand Down Expand Up @@ -90,7 +92,7 @@ func (c *tcpSvcConverter) Sync() {
c.logger.Warn("skipping TCP service on public port %d: port not found: %s:%s", publicport, svc.name, svc.port)
continue
}
addrs, _, err := convutils.CreateEndpoints(c.cache, service, svcport)
addrs, _, err := convutils.CreateEndpoints(c.cache, service, svcport, c.enableEndpointSlicesAPI)
if err != nil {
c.logger.Warn("skipping TCP service on public port %d: %v", svc.port, err)
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/converters/configmap/tcpservices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestTCPSvcSync(t *testing.T) {
c := setup(t)
for svckey, endpoints := range test.svcmock {
svcport := strings.Split(svckey, ":")
svc, ep := conv_helper.CreateService(svcport[0], svcport[1], endpoints)
svc, ep, _ := conv_helper.CreateService(svcport[0], svcport[1], endpoints)
c.cache.SvcList = append(c.cache.SvcList, svc)
c.cache.EpList[svcport[0]] = ep
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/converters/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (c *converter) createBackend(source *Source, index string, backendRefs []ga
c.logger.Warn("skipping service '%s' on %s: port '%s' not found", back.Name, source, portStr)
continue
}
epready, _, err := convutils.CreateEndpoints(c.cache, svc, svcport)
epready, _, err := convutils.CreateEndpoints(c.cache, svc, svcport, c.options.EnableEPSlices)
if err != nil {
c.logger.Warn("skipping service '%s' on %s: %v", back.Name, source, err)
continue
Expand Down
Loading