diff --git a/controller/controller_test.go b/controller/controller_test.go index 3d0b0ecfa5..6ca37edb45 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -21,6 +21,7 @@ import ( "errors" "reflect" "testing" + "time" "github.com/kubernetes-sigs/external-dns/endpoint" "github.com/kubernetes-sigs/external-dns/internal/testutils" @@ -151,3 +152,44 @@ func TestRunOnce(t *testing.T) { // Validate that the mock source was called. source.AssertExpectations(t) } + +// TestSourceEventHandler tests that the Controller can use a Source's registered handler as a callback. +func TestSourceEventHandler(t *testing.T) { + source := new(testutils.MockSource) + + handlerCh := make(chan bool) + timeoutCh := make(chan bool, 1) + stopChan := make(chan struct{}, 1) + + ctrl := &Controller{ + Source: source, + Registry: nil, + Policy: &plan.SyncPolicy{}, + } + + // Define and register a simple handler that sends a message to a channel to show it was called. + handler := func() error { + handlerCh <- true + return nil + } + // Example of preventing handler from being called more than once every 5 seconds. + ctrl.Source.AddEventHandler(handler, stopChan, 5*time.Second) + + // Send timeout message after 10 seconds to fail test if handler is not called. + go func() { + time.Sleep(10 * time.Second) + timeoutCh <- true + }() + + // Wait until we either receive a message from handlerCh or timeoutCh channel after 10 seconds. + select { + case msg := <-handlerCh: + assert.True(t, msg) + case <-timeoutCh: + assert.Fail(t, "timed out waiting for event handler to be called") + } + + close(stopChan) + close(handlerCh) + close(timeoutCh) +} diff --git a/go.mod b/go.mod index 7b858af448..1ab92d26bf 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,7 @@ require ( k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719 k8s.io/client-go v10.0.0+incompatible k8s.io/kube-openapi v0.0.0-20190401085232-94e1e7b7574c // indirect + k8s.io/kubernetes v1.14.1 ) replace ( diff --git a/go.sum b/go.sum index 1877062ea5..50cf26f2dd 100644 --- a/go.sum +++ b/go.sum @@ -782,6 +782,8 @@ k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH k8s.io/kube-openapi v0.0.0-20190401085232-94e1e7b7574c h1:kJCzg2vGCzah5icgkKR7O1Dzn0NA2iGlym27sb0ZfGE= k8s.io/kube-openapi v0.0.0-20190401085232-94e1e7b7574c/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc= k8s.io/kubernetes v1.13.1/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk= +k8s.io/kubernetes v1.14.1 h1:I9F52h5sqVxBmoSsBlNQ0YygNcukDilkpGxUbJRoBoY= +k8s.io/kubernetes v1.14.1/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk= k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0= k8s.io/utils v0.0.0-20190607212802-c55fbcfc754a/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= mvdan.cc/unparam v0.0.0-20190720180237-d51796306d8f/go.mod h1:4G1h5nDURzA3bwVMZIVpwbkw+04kSxk3rAtzlimaUJw= diff --git a/internal/testutils/mock_source.go b/internal/testutils/mock_source.go index 6a9db805ff..112c3e7c23 100644 --- a/internal/testutils/mock_source.go +++ b/internal/testutils/mock_source.go @@ -17,6 +17,8 @@ limitations under the License. package testutils import ( + "time" + "github.com/stretchr/testify/mock" "github.com/kubernetes-sigs/external-dns/endpoint" @@ -38,3 +40,23 @@ func (m *MockSource) Endpoints() ([]*endpoint.Endpoint, error) { return endpoints.([]*endpoint.Endpoint), args.Error(1) } + +// AddEventHandler adds an event handler function that's called when sources that support such a thing have changed. +func (m *MockSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { + // Execute callback handler no more than once per minInterval, until a message on stopChan is received. + go func() { + var lastCallbackTime time.Time + for { + select { + case <-stopChan: + return + default: + now := time.Now() + if now.After(lastCallbackTime.Add(minInterval)) { + handler() + lastCallbackTime = time.Now() + } + } + } + }() +} diff --git a/main.go b/main.go index 658d8e2599..8fe83aae05 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" @@ -90,9 +91,15 @@ func main() { // Lookup all the selected sources by names and pass them the desired configuration. sources, err := source.ByNames(&source.SingletonClientGenerator{ - KubeConfig: cfg.KubeConfig, - KubeMaster: cfg.Master, - RequestTimeout: cfg.RequestTimeout, + KubeConfig: cfg.KubeConfig, + KubeMaster: cfg.Master, + // If update events are enabled, disable timeout. + RequestTimeout: func() time.Duration { + if cfg.UpdateEvents { + return 0 + } + return cfg.RequestTimeout + }(), }, cfg.Sources, sourceCfg) if err != nil { log.Fatal(err) @@ -265,6 +272,13 @@ func main() { Interval: cfg.Interval, } + if cfg.UpdateEvents { + // Add RunOnce as the handler function that will be called when ingress/service sources have changed. + // Note that k8s Informers will perform an initial list operation, which results in the handler + // function initially being called for every Service/Ingress that exists limted by minInterval. + ctrl.Source.AddEventHandler(ctrl.RunOnce, stopChan, 1*time.Minute) + } + if cfg.Once { err := ctrl.RunOnce() if err != nil { diff --git a/pkg/apis/externaldns/types.go b/pkg/apis/externaldns/types.go index e6ea64c4c5..8d0a9d2446 100644 --- a/pkg/apis/externaldns/types.go +++ b/pkg/apis/externaldns/types.go @@ -103,6 +103,7 @@ type Config struct { Interval time.Duration Once bool DryRun bool + UpdateEvents bool LogFormat string MetricsAddress string LogLevel string @@ -192,6 +193,7 @@ var defaultConfig = &Config{ Interval: time.Minute, Once: false, DryRun: false, + UpdateEvents: false, LogFormat: "text", MetricsAddress: ":7979", LogLevel: logrus.InfoLevel.String(), @@ -371,6 +373,7 @@ func (cfg *Config) ParseFlags(args []string) error { app.Flag("interval", "The interval between two consecutive synchronizations in duration format (default: 1m)").Default(defaultConfig.Interval.String()).DurationVar(&cfg.Interval) app.Flag("once", "When enabled, exits the synchronization loop after the first iteration (default: disabled)").BoolVar(&cfg.Once) app.Flag("dry-run", "When enabled, prints DNS record changes rather than actually performing them (default: disabled)").BoolVar(&cfg.DryRun) + app.Flag("events", "When enabled, in addition to running every interval, the reconciliation loop will get triggered when supported sources change (default: disabled)").BoolVar(&cfg.UpdateEvents) // Miscellaneous flags app.Flag("log-format", "The format in which log messages are printed (default: text, options: text, json)").Default(defaultConfig.LogFormat).EnumVar(&cfg.LogFormat, "text", "json") diff --git a/pkg/apis/externaldns/types_test.go b/pkg/apis/externaldns/types_test.go index 80c68af235..a3fdd87cb6 100644 --- a/pkg/apis/externaldns/types_test.go +++ b/pkg/apis/externaldns/types_test.go @@ -80,6 +80,7 @@ var ( Interval: time.Minute, Once: false, DryRun: false, + UpdateEvents: false, LogFormat: "text", MetricsAddress: ":7979", LogLevel: logrus.InfoLevel.String(), @@ -151,6 +152,7 @@ var ( Interval: 10 * time.Minute, Once: true, DryRun: true, + UpdateEvents: true, LogFormat: "json", MetricsAddress: "127.0.0.1:9099", LogLevel: logrus.DebugLevel.String(), @@ -312,6 +314,7 @@ func TestParseFlags(t *testing.T) { "--interval=10m", "--once", "--dry-run", + "--events", "--log-format=json", "--metrics-address=127.0.0.1:9099", "--log-level=debug", @@ -390,6 +393,7 @@ func TestParseFlags(t *testing.T) { "EXTERNAL_DNS_INTERVAL": "10m", "EXTERNAL_DNS_ONCE": "1", "EXTERNAL_DNS_DRY_RUN": "1", + "EXTERNAL_DNS_EVENTS": "1", "EXTERNAL_DNS_LOG_FORMAT": "json", "EXTERNAL_DNS_METRICS_ADDRESS": "127.0.0.1:9099", "EXTERNAL_DNS_LOG_LEVEL": "debug", diff --git a/source/cloudfoundry.go b/source/cloudfoundry.go index e136860055..9145a1601a 100644 --- a/source/cloudfoundry.go +++ b/source/cloudfoundry.go @@ -18,6 +18,7 @@ package source import ( "net/url" + "time" cfclient "github.com/cloudfoundry-community/go-cfclient" "github.com/kubernetes-sigs/external-dns/endpoint" @@ -35,6 +36,9 @@ func NewCloudFoundrySource(cfClient *cfclient.Client) (Source, error) { }, nil } +func (rs *cloudfoundrySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +} + // Endpoints returns endpoint objects func (rs *cloudfoundrySource) Endpoints() ([]*endpoint.Endpoint, error) { endpoints := []*endpoint.Endpoint{} diff --git a/source/connector.go b/source/connector.go index 4adadea5ce..f9449beff4 100644 --- a/source/connector.go +++ b/source/connector.go @@ -63,3 +63,6 @@ func (cs *connectorSource) Endpoints() ([]*endpoint.Endpoint, error) { return endpoints, nil } + +func (cs *connectorSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +} diff --git a/source/crd.go b/source/crd.go index 66c464c7bd..82b22f32c8 100644 --- a/source/crd.go +++ b/source/crd.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/kubernetes-sigs/external-dns/endpoint" log "github.com/sirupsen/logrus" @@ -107,6 +108,9 @@ func NewCRDSource(crdClient rest.Interface, namespace, kind string, scheme *runt }, nil } +func (cs *crdSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +} + // Endpoints returns endpoint objects. func (cs *crdSource) Endpoints() ([]*endpoint.Endpoint, error) { endpoints := []*endpoint.Endpoint{} diff --git a/source/dedup_source.go b/source/dedup_source.go index ac32b159c7..62880ea43f 100644 --- a/source/dedup_source.go +++ b/source/dedup_source.go @@ -17,6 +17,8 @@ limitations under the License. package source import ( + "time" + log "github.com/sirupsen/logrus" "github.com/kubernetes-sigs/external-dns/endpoint" @@ -56,3 +58,7 @@ func (ms *dedupSource) Endpoints() ([]*endpoint.Endpoint, error) { return result, nil } + +func (ms *dedupSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { + ms.source.AddEventHandler(handler, stopChan, minInterval) +} diff --git a/source/empty.go b/source/empty.go index 591a75c5dc..06ab40c0fe 100644 --- a/source/empty.go +++ b/source/empty.go @@ -16,11 +16,17 @@ limitations under the License. package source -import "github.com/kubernetes-sigs/external-dns/endpoint" +import ( + "github.com/kubernetes-sigs/external-dns/endpoint" + "time" +) // emptySource is a Source that returns no endpoints. type emptySource struct{} +func (e *emptySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +} + // Endpoints collects endpoints of all nested Sources and returns them in a single slice. func (e *emptySource) Endpoints() ([]*endpoint.Endpoint, error) { return []*endpoint.Endpoint{}, nil diff --git a/source/fake.go b/source/fake.go index be3f55af13..29ba4fce06 100644 --- a/source/fake.go +++ b/source/fake.go @@ -54,6 +54,9 @@ func NewFakeSource(fqdnTemplate string) (Source, error) { }, nil } +func (sc *fakeSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +} + // Endpoints returns endpoint objects. func (sc *fakeSource) Endpoints() ([]*endpoint.Endpoint, error) { endpoints := make([]*endpoint.Endpoint, 10) diff --git a/source/gateway.go b/source/gateway.go index 17071be33c..78961c6cd7 100644 --- a/source/gateway.go +++ b/source/gateway.go @@ -22,6 +22,7 @@ import ( "sort" "strings" "text/template" + "time" log "github.com/sirupsen/logrus" @@ -150,6 +151,9 @@ func (sc *gatewaySource) Endpoints() ([]*endpoint.Endpoint, error) { return endpoints, nil } +func (sc *gatewaySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +} + func (sc *gatewaySource) endpointsFromTemplate(config *istiomodel.Config) ([]*endpoint.Endpoint, error) { // Process the whole template string var buf bytes.Buffer diff --git a/source/ingress.go b/source/ingress.go index bff985af7a..d076063083 100644 --- a/source/ingress.go +++ b/source/ingress.go @@ -35,6 +35,7 @@ import ( extinformers "k8s.io/client-go/informers/extensions/v1beta1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/util/async" ) const ( @@ -56,6 +57,7 @@ type ingressSource struct { combineFQDNAnnotation bool ignoreHostnameAnnotation bool ingressInformer extinformers.IngressInformer + runner *async.BoundedFrequencyRunner } // NewIngressSource creates a new ingressSource with the given config. @@ -91,7 +93,7 @@ func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilt // wait for the local cache to be populated. err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) { - return ingressInformer.Informer().HasSynced() == true, nil + return ingressInformer.Informer().HasSynced(), nil }) if err != nil { return nil, fmt.Errorf("failed to sync cache: %v", err) @@ -303,3 +305,32 @@ func targetsFromIngressStatus(status v1beta1.IngressStatus) endpoint.Targets { return targets } + +func (sc *ingressSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { + // Add custom resource event handler + log.Debug("Adding (bounded) event handler for ingress") + + maxInterval := 24 * time.Hour // handler will be called if it has not run in 24 hours + burst := 2 // allow up to two handler burst calls + log.Debugf("Adding handler to BoundedFrequencyRunner with minInterval: %v, syncPeriod: %v, bursts: %d", + minInterval, maxInterval, burst) + sc.runner = async.NewBoundedFrequencyRunner("ingress-handler", func() { + _ = handler() + }, minInterval, maxInterval, burst) + go sc.runner.Loop(stopChan) + + // run the handler function as soon as the BoundedFrequencyRunner will allow when an update occurs + sc.ingressInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + sc.runner.Run() + }, + UpdateFunc: func(old interface{}, new interface{}) { + sc.runner.Run() + }, + DeleteFunc: func(obj interface{}) { + sc.runner.Run() + }, + }, + ) +} diff --git a/source/ingressroute.go b/source/ingressroute.go index 82b42d8916..8ea93219dd 100644 --- a/source/ingressroute.go +++ b/source/ingressroute.go @@ -331,3 +331,6 @@ func parseContourLoadBalancerService(service string) (namespace, name string, er return } + +func (sc *ingressRouteSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +} diff --git a/source/multi_source.go b/source/multi_source.go index 360c9c87ff..6cba009df2 100644 --- a/source/multi_source.go +++ b/source/multi_source.go @@ -16,7 +16,10 @@ limitations under the License. package source -import "github.com/kubernetes-sigs/external-dns/endpoint" +import ( + "github.com/kubernetes-sigs/external-dns/endpoint" + "time" +) // multiSource is a Source that merges the endpoints of its nested Sources. type multiSource struct { @@ -39,6 +42,12 @@ func (ms *multiSource) Endpoints() ([]*endpoint.Endpoint, error) { return result, nil } +func (ms *multiSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { + for _, s := range ms.children { + s.AddEventHandler(handler, stopChan, minInterval) + } +} + // NewMultiSource creates a new multiSource. func NewMultiSource(children []Source) Source { return &multiSource{children: children} diff --git a/source/node.go b/source/node.go index 4364074f3b..df2e3ddf3d 100644 --- a/source/node.go +++ b/source/node.go @@ -167,6 +167,9 @@ func (ns *nodeSource) Endpoints() ([]*endpoint.Endpoint, error) { return endpointsSlice, nil } +func (ns *nodeSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { +} + // nodeAddress returns node's externalIP and if that's not found, node's internalIP // basically what k8s.io/kubernetes/pkg/util/node.GetPreferredNodeAddress does func (ns *nodeSource) nodeAddresses(node *v1.Node) ([]string, error) { diff --git a/source/service.go b/source/service.go index 4a70b4c39f..a88d52b615 100644 --- a/source/service.go +++ b/source/service.go @@ -22,6 +22,7 @@ import ( "sort" "strings" "text/template" + "time" kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" @@ -35,7 +36,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" - "time" + "k8s.io/kubernetes/pkg/util/async" "github.com/kubernetes-sigs/external-dns/endpoint" ) @@ -64,6 +65,7 @@ type serviceSource struct { podInformer coreinformers.PodInformer nodeInformer coreinformers.NodeInformer serviceTypeFilter map[string]struct{} + runner *async.BoundedFrequencyRunner } // NewServiceSource creates a new serviceSource with the given config. @@ -92,21 +94,18 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt serviceInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - log.Debug("service added") }, }, ) podInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - log.Debug("pod added") }, }, ) nodeInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - log.Debug("node added") }, }, ) @@ -116,7 +115,7 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt // wait for the local cache to be populated. err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) { - return serviceInformer.Informer().HasSynced() == true, nil + return serviceInformer.Informer().HasSynced(), nil }) if err != nil { return nil, fmt.Errorf("failed to sync cache: %v", err) @@ -549,3 +548,32 @@ func (sc *serviceSource) extractNodePortEndpoints(svc *v1.Service, nodeTargets e return endpoints } + +func (sc *serviceSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) { + // Add custom resource event handler + log.Debug("Adding (bounded) event handler for service") + + maxInterval := 24 * time.Hour // handler will be called if it has not run in 24 hours + burst := 2 // allow up to two handler burst calls + log.Debugf("Adding handler to BoundedFrequencyRunner with minInterval: %v, syncPeriod: %v, bursts: %d", + minInterval, maxInterval, burst) + sc.runner = async.NewBoundedFrequencyRunner("service-handler", func() { + _ = handler() + }, minInterval, maxInterval, burst) + go sc.runner.Loop(stopChan) + + // run the handler function as soon as the BoundedFrequencyRunner will allow when an update occurs + sc.serviceInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + sc.runner.Run() + }, + UpdateFunc: func(old interface{}, new interface{}) { + sc.runner.Run() + }, + DeleteFunc: func(obj interface{}) { + sc.runner.Run() + }, + }, + ) +} diff --git a/source/source.go b/source/source.go index a2b265587d..71f1de9d0e 100644 --- a/source/source.go +++ b/source/source.go @@ -22,6 +22,7 @@ import ( "net" "strconv" "strings" + "time" "github.com/kubernetes-sigs/external-dns/endpoint" ) @@ -57,6 +58,9 @@ const ( // Source defines the interface Endpoint sources should implement. type Source interface { Endpoints() ([]*endpoint.Endpoint, error) + // AddEventHandler adds an event handler function that's called when (supported) sources have changed. + // The handler should not be called more than than once per time.Duration and not again after stop channel is closed. + AddEventHandler(func() error, <-chan struct{}, time.Duration) } func getTTLFromAnnotations(annotations map[string]string) (endpoint.TTL, error) {