Skip to content

Commit

Permalink
Add --watchers flag to automatically trigger synchronization loop on …
Browse files Browse the repository at this point in the history
…adds, updates, and deletes for supported source resources.

Address gofmt and lint errors.
  • Loading branch information
jlamillan committed Oct 30, 2018
1 parent 21b7c3a commit 8ae3e21
Show file tree
Hide file tree
Showing 14 changed files with 139 additions and 10 deletions.
13 changes: 12 additions & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type Controller struct {
// The policy that defines which changes to DNS records are allowed
Policy plan.Policy
// The interval between individual synchronizations
Interval time.Duration
Interval time.Duration
RegisterHandler bool
}

// RunOnce runs a single iteration of a reconciliation loop.
Expand Down Expand Up @@ -94,6 +95,16 @@ func (c *Controller) RunOnce() error {

// Run runs RunOnce in a loop with a delay until stopChan receives a value.
func (c *Controller) Run(stopChan <-chan struct{}) {

if c.RegisterHandler {
// Add RunOnce as the handler function that will be called when sources that support being watched has changed.
// Note that k8s Informers will perform an initial list operation, which results in the handler
// function initially being called for every Service/Ingress that exists (in the namespace).
if err := c.Source.AddEventHandler(c.RunOnce, stopChan); err != nil {
log.Error(err)
}
}

ticker := time.NewTicker(c.Interval)
defer ticker.Stop()
for {
Expand Down
8 changes: 6 additions & 2 deletions internal/testutils/mock_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ limitations under the License.
package testutils

import (
"github.com/stretchr/testify/mock"

"github.com/kubernetes-incubator/external-dns/endpoint"
"github.com/stretchr/testify/mock"
)

// MockSource returns mock endpoints.
Expand All @@ -38,3 +37,8 @@ func (m *MockSource) Endpoints() ([]*endpoint.Endpoint, error) {

return endpoints.([]*endpoint.Endpoint), args.Error(1)
}

// AddEventHandler implements the Source interface, does nothing in mock mode.
func (m *MockSource) AddEventHandler(handler func() error, stopChan <-chan struct{}) error {
return nil
}
22 changes: 15 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/kubernetes-incubator/external-dns/provider"
"github.com/kubernetes-incubator/external-dns/registry"
"github.com/kubernetes-incubator/external-dns/source"
"time"
)

func main() {
Expand Down Expand Up @@ -85,9 +86,15 @@ func main() {

// Lookup all the selected sources by names and pass them the desired configuration.
sources, err := source.ByNames(&source.SingletonClientGenerator{
KubeConfig: cfg.KubeConfig,
KubeMaster: cfg.Master,
RequestTimeout: cfg.RequestTimeout,
KubeConfig: cfg.KubeConfig,
KubeMaster: cfg.Master,
// If watchers are enabled, disable timeout.
RequestTimeout: func() time.Duration {
if cfg.WatchersEnabled {
return 0
}
return cfg.RequestTimeout
}(),
}, cfg.Sources, sourceCfg)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -223,10 +230,11 @@ func main() {
}

ctrl := controller.Controller{
Source: endpointsSource,
Registry: r,
Policy: policy,
Interval: cfg.Interval,
Source: endpointsSource,
Registry: r,
Policy: policy,
Interval: cfg.Interval,
RegisterHandler: cfg.WatchersEnabled,
}

if cfg.Once {
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/externaldns/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type Config struct {
Interval time.Duration
Once bool
DryRun bool
WatchersEnabled bool
LogFormat string
MetricsAddress string
LogLevel string
Expand Down Expand Up @@ -156,6 +157,7 @@ var defaultConfig = &Config{
Interval: time.Minute,
Once: false,
DryRun: false,
WatchersEnabled: false,
LogFormat: "text",
MetricsAddress: ":7979",
LogLevel: logrus.InfoLevel.String(),
Expand Down Expand Up @@ -297,6 +299,7 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("interval", "The interval between two consecutive synchronizations in duration format (default: 1m)").Default(defaultConfig.Interval.String()).DurationVar(&cfg.Interval)
app.Flag("once", "When enabled, exits the synchronization loop after the first iteration (default: disabled)").BoolVar(&cfg.Once)
app.Flag("dry-run", "When enabled, prints DNS record changes rather than actually performing them (default: disabled)").BoolVar(&cfg.DryRun)
app.Flag("watchers", "When enabled, watches supported sources to process changes immediately in addition to running every interval (default: disabled)").BoolVar(&cfg.WatchersEnabled)

// Miscellaneous flags
app.Flag("log-format", "The format in which log messages are printed (default: text, options: text, json)").Default(defaultConfig.LogFormat).EnumVar(&cfg.LogFormat, "text", "json")
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/externaldns/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
Interval: time.Minute,
Once: false,
DryRun: false,
WatchersEnabled: false,
LogFormat: "text",
MetricsAddress: ":7979",
LogLevel: logrus.InfoLevel.String(),
Expand Down Expand Up @@ -123,6 +124,7 @@ var (
Interval: 10 * time.Minute,
Once: true,
DryRun: true,
WatchersEnabled: true,
LogFormat: "json",
MetricsAddress: "127.0.0.1:9099",
LogLevel: logrus.DebugLevel.String(),
Expand Down Expand Up @@ -201,6 +203,7 @@ func TestParseFlags(t *testing.T) {
"--interval=10m",
"--once",
"--dry-run",
"--watchers",
"--log-format=json",
"--metrics-address=127.0.0.1:9099",
"--log-level=debug",
Expand Down Expand Up @@ -260,6 +263,7 @@ func TestParseFlags(t *testing.T) {
"EXTERNAL_DNS_INTERVAL": "10m",
"EXTERNAL_DNS_ONCE": "1",
"EXTERNAL_DNS_DRY_RUN": "1",
"EXTERNAL_DNS_WATCHERS": "1",
"EXTERNAL_DNS_LOG_FORMAT": "json",
"EXTERNAL_DNS_METRICS_ADDRESS": "127.0.0.1:9099",
"EXTERNAL_DNS_LOG_LEVEL": "debug",
Expand Down
5 changes: 5 additions & 0 deletions source/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package source

import (
"encoding/gob"
"errors"
"net"
"time"

Expand Down Expand Up @@ -63,3 +64,7 @@ func (cs *connectorSource) Endpoints() ([]*endpoint.Endpoint, error) {

return endpoints, nil
}

func (cs *connectorSource) AddEventHandler(handler func() error, stopChan <-chan struct{}) error {
return errors.New("connector watcher not supported")
}
5 changes: 5 additions & 0 deletions source/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package source

import (
"errors"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -107,6 +108,10 @@ func NewCRDSource(crdClient rest.Interface, namespace, kind string, scheme *runt
}, nil
}

func (cs *crdSource) AddEventHandler(handler func() error, stopChan <-chan struct{}) error {
return errors.New("CRD watcher not supported")
}

// Endpoints returns endpoint objects.
func (cs *crdSource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := []*endpoint.Endpoint{}
Expand Down
4 changes: 4 additions & 0 deletions source/dedup_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ func (ms *dedupSource) Endpoints() ([]*endpoint.Endpoint, error) {

return result, nil
}

func (ms *dedupSource) AddEventHandler(handler func() error, stopChan <-chan struct{}) error {
return ms.source.AddEventHandler(handler, stopChan)
}
5 changes: 5 additions & 0 deletions source/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Note: currently only supports IP targets (A records), not hostname targets
package source

import (
"errors"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -54,6 +55,10 @@ func NewFakeSource(fqdnTemplate string) (Source, error) {
}, nil
}

func (sc *fakeSource) AddEventHandler(handler func() error, stopChan <-chan struct{}) error {
return errors.New("fake watcher not supported")
}

// Endpoints returns endpoint objects.
func (sc *fakeSource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := make([]*endpoint.Endpoint, 10)
Expand Down
5 changes: 5 additions & 0 deletions source/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package source

import (
"bytes"
"errors"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -148,6 +149,10 @@ func (sc *gatewaySource) Endpoints() ([]*endpoint.Endpoint, error) {
return endpoints, nil
}

func (sc *gatewaySource) AddEventHandler(handler func() error, stopChan <-chan struct{}) error {
return errors.New("gateway watcher not supported")
}

func (sc *gatewaySource) endpointsFromTemplate(config *istiomodel.Config) ([]*endpoint.Endpoint, error) {
// Process the whole template string
var buf bytes.Buffer
Expand Down
32 changes: 32 additions & 0 deletions source/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"text/template"

log "github.com/sirupsen/logrus"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"

"k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"

"github.com/kubernetes-incubator/external-dns/endpoint"
extinformers "k8s.io/client-go/informers/extensions/v1beta1"
)

// ingressSource is an implementation of Source for Kubernetes ingress objects.
Expand All @@ -43,6 +46,8 @@ type ingressSource struct {
annotationFilter string
fqdnTemplate *template.Template
combineFQDNAnnotation bool
informerFactory kubeinformers.SharedInformerFactory
ingressInformer extinformers.IngressInformer
}

// NewIngressSource creates a new ingressSource with the given config.
Expand All @@ -60,12 +65,17 @@ func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilt
}
}

// Use shared informer to listen for add/update/delete of ingresses in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed.
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))

return &ingressSource{
client: kubeClient,
namespace: namespace,
annotationFilter: annotationFilter,
fqdnTemplate: tmpl,
combineFQDNAnnotation: combineFqdnAnnotation,
informerFactory: informerFactory,
}, nil
}

Expand Down Expand Up @@ -125,6 +135,28 @@ func (sc *ingressSource) Endpoints() ([]*endpoint.Endpoint, error) {
return endpoints, nil
}

func (sc *ingressSource) AddEventHandler(handler func() error, stopChan <-chan struct{}) error {
log.Debug("Adding handler for ingress")

sc.ingressInformer = sc.informerFactory.Extensions().V1beta1().Ingresses()

sc.ingressInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { handler() },
UpdateFunc: func(old interface{}, new interface{}) { handler() },
DeleteFunc: func(obj interface{}) { handler() },
},
)

sc.informerFactory.Start(stopChan)
// wait for the local cache to be populated.
if !cache.WaitForCacheSync(stopChan, sc.ingressInformer.Informer().HasSynced) {
log.Error("failed to wait for caches to sync")
}

return nil
}

func (sc *ingressSource) endpointsFromTemplate(ing *v1beta1.Ingress) ([]*endpoint.Endpoint, error) {
// Process the whole template string
var buf bytes.Buffer
Expand Down
11 changes: 11 additions & 0 deletions source/multi_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ func (ms *multiSource) Endpoints() ([]*endpoint.Endpoint, error) {
return result, nil
}

func (ms *multiSource) AddEventHandler(handler func() error, stopChan <-chan struct{}) error {

for _, s := range ms.children {
err := s.AddEventHandler(handler, stopChan)
if err != nil {
return err
}
}
return nil
}

// NewMultiSource creates a new multiSource.
func NewMultiSource(children []Source) Source {
return &multiSource{children: children}
Expand Down
31 changes: 31 additions & 0 deletions source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package source
import (
"bytes"
"fmt"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"sort"
"strings"
"text/template"
Expand Down Expand Up @@ -53,6 +56,8 @@ type serviceSource struct {
combineFQDNAnnotation bool
publishInternal bool
publishHostIP bool
informerFactory kubeinformers.SharedInformerFactory
serviceInformer coreinformers.ServiceInformer
serviceTypeFilter map[string]struct{}
}

Expand All @@ -71,6 +76,10 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
}
}

// Use shared informer to listen for add/update/delete of services in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))

// Transform the slice into a map so it will
// be way much easier and fast to filter later
serviceTypes := make(map[string]struct{})
Expand All @@ -87,6 +96,7 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
combineFQDNAnnotation: combineFqdnAnnotation,
publishInternal: publishInternal,
publishHostIP: publishHostIP,
informerFactory: informerFactory,
serviceTypeFilter: serviceTypes,
}, nil
}
Expand Down Expand Up @@ -162,6 +172,27 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
return endpoints, nil
}

func (sc *serviceSource) AddEventHandler(handler func() error, stopChan <-chan struct{}) error {
log.Debugf("Adding handler for service")
sc.serviceInformer = sc.informerFactory.Core().V1().Services()

sc.serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { handler() },
UpdateFunc: func(old interface{}, new interface{}) { handler() },
DeleteFunc: func(obj interface{}) { handler() },
},
)

sc.informerFactory.Start(stopChan)
// wait for the local cache to be populated.
if !cache.WaitForCacheSync(stopChan, sc.serviceInformer.Informer().HasSynced) {
log.Error("failed to wait for caches to sync")
}

return nil
}

func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint

Expand Down
1 change: 1 addition & 0 deletions source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
// Source defines the interface Endpoint sources should implement.
type Source interface {
Endpoints() ([]*endpoint.Endpoint, error)
AddEventHandler(func() error, <-chan struct{}) error
}

func getTTLFromAnnotations(annotations map[string]string) (endpoint.TTL, error) {
Expand Down

0 comments on commit 8ae3e21

Please sign in to comment.