Skip to content

Commit

Permalink
Merge pull request #4076 from k8r-io/fix-hangs-on-traefik-listeners
Browse files Browse the repository at this point in the history
Fix timeout for traefik-proxy source
  • Loading branch information
k8s-ci-robot committed Dec 27, 2023
2 parents 7d7317a + 9f29220 commit 97a8fa3
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 69 deletions.
18 changes: 18 additions & 0 deletions docs/tutorials/traefik-proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,21 @@ Now that we have verified that ExternalDNS will automatically manage Traefik DNS
$ kubectl delete -f traefik-ingress.yaml
$ kubectl delete -f externaldns.yaml
```

## Additional Flags

| Flag | Description |
| --- | --- |
| --traefik-disable-legacy | Disable listeners on Resources under traefik.containo.us |
| --traefik-disable-new | Disable listeners on Resources under traefik.io |

### Disabling Resource Listeners

Traefik has deprecated the legacy API group, traefik.containo.us, in favor of traefik.io. By default the traefik-proxy source will listen for resources under both API groups; however, this may cause timeouts with the following message

```
FATA[0060] failed to sync traefik.io/v1alpha1, Resource=ingressroutes: context deadline exceeded
```

In this case you can disable one or the other API groups with `--traefik-disable-new` or `--traefik-disable-legacy`

2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ func main() {
OCPRouterName: cfg.OCPRouterName,
UpdateEvents: cfg.UpdateEvents,
ResolveLoadBalancerHostname: cfg.ResolveServiceLoadBalancerHostname,
TraefikDisableLegacy: cfg.TraefikDisableLegacy,
TraefikDisableNew: cfg.TraefikDisableNew,
}

// Lookup all the selected sources by names and pass them the desired configuration.
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/externaldns/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ type Config struct {
WebhookProviderReadTimeout time.Duration
WebhookProviderWriteTimeout time.Duration
WebhookServer bool
TraefikDisableLegacy bool
TraefikDisableNew bool
}

var defaultConfig = &Config{
Expand Down Expand Up @@ -369,6 +371,8 @@ var defaultConfig = &Config{
WebhookProviderReadTimeout: 5 * time.Second,
WebhookProviderWriteTimeout: 10 * time.Second,
WebhookServer: false,
TraefikDisableLegacy: false,
TraefikDisableNew: false,
}

// NewConfig returns new Config object
Expand Down Expand Up @@ -456,6 +460,8 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("default-targets", "Set globally default host/IP that will apply as a target instead of source addresses. Specify multiple times for multiple targets (optional)").StringsVar(&cfg.DefaultTargets)
app.Flag("target-net-filter", "Limit possible targets by a net filter; specify multiple times for multiple possible nets (optional)").StringsVar(&cfg.TargetNetFilter)
app.Flag("exclude-target-net", "Exclude target nets (optional)").StringsVar(&cfg.ExcludeTargetNets)
app.Flag("traefik-disable-legacy", "Disable listeners on Resources under the traefik.containo.us API Group").Default(strconv.FormatBool(defaultConfig.TraefikDisableLegacy)).BoolVar(&cfg.TraefikDisableLegacy)
app.Flag("traefik-disable-new", "Disable listeners on Resources under the traefik.io API Group").Default(strconv.FormatBool(defaultConfig.TraefikDisableNew)).BoolVar(&cfg.TraefikDisableNew)

// Flags related to providers
providers := []string{"akamai", "alibabacloud", "aws", "aws-sd", "azure", "azure-dns", "azure-private-dns", "bluecat", "civo", "cloudflare", "coredns", "designate", "digitalocean", "dnsimple", "dyn", "exoscale", "gandi", "godaddy", "google", "ibmcloud", "infoblox", "inmemory", "linode", "ns1", "oci", "ovh", "pdns", "pihole", "plural", "rcodezero", "rdns", "rfc2136", "safedns", "scaleway", "skydns", "tencentcloud", "transip", "ultradns", "vinyldns", "vultr", "webhook"}
Expand Down
4 changes: 3 additions & 1 deletion source/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Config struct {
OCPRouterName string
UpdateEvents bool
ResolveLoadBalancerHostname bool
TraefikDisableLegacy bool
TraefikDisableNew bool
}

// ClientGenerator provides clients
Expand Down Expand Up @@ -300,7 +302,7 @@ func BuildWithConfig(ctx context.Context, source string, p ClientGenerator, cfg
if err != nil {
return nil, err
}
return NewTraefikSource(ctx, dynamicClient, kubernetesClient, cfg.Namespace, cfg.AnnotationFilter, cfg.IgnoreHostnameAnnotation)
return NewTraefikSource(ctx, dynamicClient, kubernetesClient, cfg.Namespace, cfg.AnnotationFilter, cfg.IgnoreHostnameAnnotation, cfg.TraefikDisableLegacy, cfg.TraefikDisableNew)
case "openshift-route":
ocpClient, err := p.OpenShiftClient()
if err != nil {
Expand Down
141 changes: 79 additions & 62 deletions source/traefik_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,48 +93,54 @@ type traefikSource struct {
unstructuredConverter *unstructuredConverter
}

func NewTraefikSource(ctx context.Context, dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string, ignoreHostnameAnnotation bool) (Source, error) {
func NewTraefikSource(ctx context.Context, dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string, ignoreHostnameAnnotation bool, disableLegacy bool, disableNew bool) (Source, error) {
// Use shared informer to listen for add/update/delete of Host in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed.
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil)
ingressRouteInformer := informerFactory.ForResource(ingressrouteGVR)
ingressRouteTcpInformer := informerFactory.ForResource(ingressrouteTCPGVR)
ingressRouteUdpInformer := informerFactory.ForResource(ingressrouteUDPGVR)
oldIngressRouteInformer := informerFactory.ForResource(oldIngressrouteGVR)
oldIngressRouteTcpInformer := informerFactory.ForResource(oldIngressrouteTCPGVR)
oldIngressRouteUdpInformer := informerFactory.ForResource(oldIngressrouteUDPGVR)
var ingressRouteInformer, ingressRouteTcpInformer, ingressRouteUdpInformer informers.GenericInformer
var oldIngressRouteInformer, oldIngressRouteTcpInformer, oldIngressRouteUdpInformer informers.GenericInformer

// Add default resource event handlers to properly initialize informers.
ingressRouteInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
ingressRouteTcpInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
ingressRouteUdpInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
oldIngressRouteInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
oldIngressRouteTcpInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
oldIngressRouteUdpInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
if !disableNew {
ingressRouteInformer = informerFactory.ForResource(ingressrouteGVR)
ingressRouteTcpInformer = informerFactory.ForResource(ingressrouteTCPGVR)
ingressRouteUdpInformer = informerFactory.ForResource(ingressrouteUDPGVR)
ingressRouteInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
ingressRouteTcpInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
ingressRouteUdpInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
}
if !disableLegacy {
oldIngressRouteInformer = informerFactory.ForResource(oldIngressrouteGVR)
oldIngressRouteTcpInformer = informerFactory.ForResource(oldIngressrouteTCPGVR)
oldIngressRouteUdpInformer = informerFactory.ForResource(oldIngressrouteUDPGVR)
oldIngressRouteInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
oldIngressRouteTcpInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
oldIngressRouteUdpInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
}

informerFactory.Start((ctx.Done()))

Expand Down Expand Up @@ -167,38 +173,49 @@ func NewTraefikSource(ctx context.Context, dynamicKubeClient dynamic.Interface,
func (ts *traefikSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
var endpoints []*endpoint.Endpoint

ingressRouteEndpoints, err := ts.ingressRouteEndpoints()
if err != nil {
return nil, err
if ts.ingressRouteInformer != nil {
ingressRouteEndpoints, err := ts.ingressRouteEndpoints()
if err != nil {
return nil, err
}
endpoints = append(endpoints, ingressRouteEndpoints...)
}
oldIngressRouteEndpoints, err := ts.oldIngressRouteEndpoints()
if err != nil {
return nil, err
if ts.oldIngressRouteInformer != nil {
oldIngressRouteEndpoints, err := ts.oldIngressRouteEndpoints()
if err != nil {
return nil, err
}
endpoints = append(endpoints, oldIngressRouteEndpoints...)
}
ingressRouteTCPEndpoints, err := ts.ingressRouteTCPEndpoints()
if err != nil {
return nil, err
if ts.ingressRouteTcpInformer != nil {
ingressRouteTcpEndpoints, err := ts.ingressRouteTCPEndpoints()
if err != nil {
return nil, err
}
endpoints = append(endpoints, ingressRouteTcpEndpoints...)
}
oldIngressRouteTCPEndpoints, err := ts.oldIngressRouteTCPEndpoints()
if err != nil {
return nil, err
if ts.oldIngressRouteTcpInformer != nil {
oldIngressRouteTcpEndpoints, err := ts.oldIngressRouteTCPEndpoints()
if err != nil {
return nil, err
}
endpoints = append(endpoints, oldIngressRouteTcpEndpoints...)
}
ingressRouteUDPEndpoints, err := ts.ingressRouteUDPEndpoints()
if err != nil {
return nil, err
if ts.ingressRouteUdpInformer != nil {
ingressRouteUdpEndpoints, err := ts.ingressRouteUDPEndpoints()
if err != nil {
return nil, err
}
endpoints = append(endpoints, ingressRouteUdpEndpoints...)
}
oldIngressRouteUDPEndpoints, err := ts.oldIngressRouteUDPEndpoints()
if err != nil {
return nil, err
if ts.oldIngressRouteUdpInformer != nil {
oldIngressRouteUdpEndpoints, err := ts.oldIngressRouteUDPEndpoints()
if err != nil {
return nil, err
}
endpoints = append(endpoints, oldIngressRouteUdpEndpoints...)
}

endpoints = append(endpoints, ingressRouteEndpoints...)
endpoints = append(endpoints, ingressRouteTCPEndpoints...)
endpoints = append(endpoints, ingressRouteUDPEndpoints...)
endpoints = append(endpoints, oldIngressRouteEndpoints...)
endpoints = append(endpoints, oldIngressRouteTCPEndpoints...)
endpoints = append(endpoints, oldIngressRouteUDPEndpoints...)

for _, ep := range endpoints {
sort.Sort(ep.Targets)
}
Expand Down
Loading

0 comments on commit 97a8fa3

Please sign in to comment.