Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --watchers flag to allow controller to respond automatically to Ingress or Service updates #687

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"reflect"
"testing"
"time"

"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/internal/testutils"
Expand Down Expand Up @@ -151,3 +152,44 @@ func TestRunOnce(t *testing.T) {
// Validate that the mock source was called.
source.AssertExpectations(t)
}

// TestSourceEventHandler tests that the Controller can use a Source's registered handler as a callback.
func TestSourceEventHandler(t *testing.T) {
source := new(testutils.MockSource)

handlerCh := make(chan bool)
timeoutCh := make(chan bool, 1)
stopChan := make(chan struct{}, 1)

ctrl := &Controller{
Source: source,
Registry: nil,
Policy: &plan.SyncPolicy{},
}

// Define and register a simple handler that sends a message to a channel to show it was called.
handler := func() error {
handlerCh <- true
return nil
}
// Example of preventing handler from being called more than once every 5 seconds.
ctrl.Source.AddEventHandler(handler, stopChan, 5*time.Second)

// Send timeout message after 10 seconds to fail test if handler is not called.
go func() {
time.Sleep(10 * time.Second)
timeoutCh <- true
}()

// Wait until we either receive a message from handlerCh or timeoutCh channel after 10 seconds.
select {
case msg := <-handlerCh:
assert.True(t, msg)
case <-timeoutCh:
assert.Fail(t, "timed out waiting for event handler to be called")
}

close(stopChan)
close(handlerCh)
close(timeoutCh)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/client-go v10.0.0+incompatible
k8s.io/kubernetes v1.14.1
)

replace (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,8 @@ k8s.io/helm v2.13.1+incompatible/go.mod h1:LZzlS4LQBHfciFOurYBFkCMTaZ0D1l+p0teMg
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/kubernetes v1.13.1/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/kubernetes v1.14.1 h1:I9F52h5sqVxBmoSsBlNQ0YygNcukDilkpGxUbJRoBoY=
k8s.io/kubernetes v1.14.1/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0=
k8s.io/utils v0.0.0-20190607212802-c55fbcfc754a/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
mvdan.cc/unparam v0.0.0-20190720180237-d51796306d8f/go.mod h1:4G1h5nDURzA3bwVMZIVpwbkw+04kSxk3rAtzlimaUJw=
Expand Down
23 changes: 22 additions & 1 deletion internal/testutils/mock_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ limitations under the License.
package testutils

import (
"github.com/stretchr/testify/mock"
"time"

"github.com/stretchr/testify/mock"
"sigs.k8s.io/external-dns/endpoint"
)

Expand All @@ -38,3 +39,23 @@ func (m *MockSource) Endpoints() ([]*endpoint.Endpoint, error) {

return endpoints.([]*endpoint.Endpoint), args.Error(1)
}

// AddEventHandler adds an event handler function that's called when sources that support such a thing have changed.
func (m *MockSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
// Execute callback handler no more than once per minInterval, until a message on stopChan is received.
go func() {
var lastCallbackTime time.Time
for {
select {
case <-stopChan:
return
default:
now := time.Now()
if now.After(lastCallbackTime.Add(minInterval)) {
handler()
lastCallbackTime = time.Now()
}
}
}
}()
}
20 changes: 17 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -93,9 +94,15 @@ func main() {

// Lookup all the selected sources by names and pass them the desired configuration.
sources, err := source.ByNames(&source.SingletonClientGenerator{
KubeConfig: cfg.KubeConfig,
KubeMaster: cfg.Master,
RequestTimeout: cfg.RequestTimeout,
KubeConfig: cfg.KubeConfig,
KubeMaster: cfg.Master,
// If update events are enabled, disable timeout.
RequestTimeout: func() time.Duration {
if cfg.UpdateEvents {
return 0
}
return cfg.RequestTimeout
}(),
}, cfg.Sources, sourceCfg)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -281,6 +288,13 @@ func main() {
Interval: cfg.Interval,
}

if cfg.UpdateEvents {
// Add RunOnce as the handler function that will be called when ingress/service sources have changed.
// Note that k8s Informers will perform an initial list operation, which results in the handler
// function initially being called for every Service/Ingress that exists limted by minInterval.
ctrl.Source.AddEventHandler(func() error { return ctrl.RunOnce(ctx) }, stopChan, 1*time.Minute)
}

if cfg.Once {
err := ctrl.RunOnce(ctx)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/externaldns/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type Config struct {
Interval time.Duration
Once bool
DryRun bool
UpdateEvents bool
LogFormat string
MetricsAddress string
LogLevel string
Expand Down Expand Up @@ -201,6 +202,7 @@ var defaultConfig = &Config{
Interval: time.Minute,
Once: false,
DryRun: false,
UpdateEvents: false,
LogFormat: "text",
MetricsAddress: ":7979",
LogLevel: logrus.InfoLevel.String(),
Expand Down Expand Up @@ -382,6 +384,7 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("interval", "The interval between two consecutive synchronizations in duration format (default: 1m)").Default(defaultConfig.Interval.String()).DurationVar(&cfg.Interval)
app.Flag("once", "When enabled, exits the synchronization loop after the first iteration (default: disabled)").BoolVar(&cfg.Once)
app.Flag("dry-run", "When enabled, prints DNS record changes rather than actually performing them (default: disabled)").BoolVar(&cfg.DryRun)
app.Flag("events", "When enabled, in addition to running every interval, the reconciliation loop will get triggered when supported sources change (default: disabled)").BoolVar(&cfg.UpdateEvents)

// Miscellaneous flags
app.Flag("log-format", "The format in which log messages are printed (default: text, options: text, json)").Default(defaultConfig.LogFormat).EnumVar(&cfg.LogFormat, "text", "json")
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/externaldns/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ var (
Interval: time.Minute,
Once: false,
DryRun: false,
UpdateEvents: false,
LogFormat: "text",
MetricsAddress: ":7979",
LogLevel: logrus.InfoLevel.String(),
Expand Down Expand Up @@ -157,6 +158,7 @@ var (
Interval: 10 * time.Minute,
Once: true,
DryRun: true,
UpdateEvents: true,
LogFormat: "json",
MetricsAddress: "127.0.0.1:9099",
LogLevel: logrus.DebugLevel.String(),
Expand Down Expand Up @@ -257,6 +259,7 @@ func TestParseFlags(t *testing.T) {
"--interval=10m",
"--once",
"--dry-run",
"--events",
"--log-format=json",
"--metrics-address=127.0.0.1:9099",
"--log-level=debug",
Expand Down Expand Up @@ -338,6 +341,7 @@ func TestParseFlags(t *testing.T) {
"EXTERNAL_DNS_INTERVAL": "10m",
"EXTERNAL_DNS_ONCE": "1",
"EXTERNAL_DNS_DRY_RUN": "1",
"EXTERNAL_DNS_EVENTS": "1",
"EXTERNAL_DNS_LOG_FORMAT": "json",
"EXTERNAL_DNS_METRICS_ADDRESS": "127.0.0.1:9099",
"EXTERNAL_DNS_LOG_LEVEL": "debug",
Expand Down
4 changes: 4 additions & 0 deletions source/cloudfoundry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package source

import (
"net/url"
"time"

cfclient "github.com/cloudfoundry-community/go-cfclient"

Expand All @@ -36,6 +37,9 @@ func NewCloudFoundrySource(cfClient *cfclient.Client) (Source, error) {
}, nil
}

func (rs *cloudfoundrySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}

// Endpoints returns endpoint objects
func (rs *cloudfoundrySource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := []*endpoint.Endpoint{}
Expand Down
3 changes: 3 additions & 0 deletions source/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,6 @@ func (cs *connectorSource) Endpoints() ([]*endpoint.Endpoint, error) {

return endpoints, nil
}

func (cs *connectorSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}
4 changes: 4 additions & 0 deletions source/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"strings"
"time"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -108,6 +109,9 @@ func NewCRDSource(crdClient rest.Interface, namespace, kind string, scheme *runt
}, nil
}

func (cs *crdSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}

// Endpoints returns endpoint objects.
func (cs *crdSource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := []*endpoint.Endpoint{}
Expand Down
6 changes: 6 additions & 0 deletions source/dedup_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package source

import (
"time"

log "github.com/sirupsen/logrus"

"sigs.k8s.io/external-dns/endpoint"
Expand Down Expand Up @@ -56,3 +58,7 @@ func (ms *dedupSource) Endpoints() ([]*endpoint.Endpoint, error) {

return result, nil
}

func (ms *dedupSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
ms.source.AddEventHandler(handler, stopChan, minInterval)
}
9 changes: 8 additions & 1 deletion source/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@ limitations under the License.

package source

import "sigs.k8s.io/external-dns/endpoint"
import (
"time"

"sigs.k8s.io/external-dns/endpoint"
)

// emptySource is a Source that returns no endpoints.
type emptySource struct{}

func (e *emptySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}

// Endpoints collects endpoints of all nested Sources and returns them in a single slice.
func (e *emptySource) Endpoints() ([]*endpoint.Endpoint, error) {
return []*endpoint.Endpoint{}, nil
Expand Down
3 changes: 3 additions & 0 deletions source/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func NewFakeSource(fqdnTemplate string) (Source, error) {
}, nil
}

func (sc *fakeSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}

// Endpoints returns endpoint objects.
func (sc *fakeSource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := make([]*endpoint.Endpoint, 10)
Expand Down
3 changes: 3 additions & 0 deletions source/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ func (sc *gatewaySource) Endpoints() ([]*endpoint.Endpoint, error) {
return endpoints, nil
}

func (sc *gatewaySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}

func (sc *gatewaySource) endpointsFromTemplate(config *istiomodel.Config) ([]*endpoint.Endpoint, error) {
// Process the whole template string
var buf bytes.Buffer
Expand Down
33 changes: 32 additions & 1 deletion source/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
extinformers "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/util/async"

"sigs.k8s.io/external-dns/endpoint"
)
Expand All @@ -56,6 +57,7 @@ type ingressSource struct {
combineFQDNAnnotation bool
ignoreHostnameAnnotation bool
ingressInformer extinformers.IngressInformer
runner *async.BoundedFrequencyRunner
}

// NewIngressSource creates a new ingressSource with the given config.
Expand Down Expand Up @@ -91,7 +93,7 @@ func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilt

// wait for the local cache to be populated.
err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
return ingressInformer.Informer().HasSynced() == true, nil
return ingressInformer.Informer().HasSynced(), nil
})
if err != nil {
return nil, fmt.Errorf("failed to sync cache: %v", err)
Expand Down Expand Up @@ -303,3 +305,32 @@ func targetsFromIngressStatus(status v1beta1.IngressStatus) endpoint.Targets {

return targets
}

func (sc *ingressSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
// Add custom resource event handler
log.Debug("Adding (bounded) event handler for ingress")

maxInterval := 24 * time.Hour // handler will be called if it has not run in 24 hours
burst := 2 // allow up to two handler burst calls
log.Debugf("Adding handler to BoundedFrequencyRunner with minInterval: %v, syncPeriod: %v, bursts: %d",
minInterval, maxInterval, burst)
sc.runner = async.NewBoundedFrequencyRunner("ingress-handler", func() {
_ = handler()
}, minInterval, maxInterval, burst)
go sc.runner.Loop(stopChan)

// run the handler function as soon as the BoundedFrequencyRunner will allow when an update occurs
sc.ingressInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
sc.runner.Run()
},
UpdateFunc: func(old interface{}, new interface{}) {
sc.runner.Run()
},
DeleteFunc: func(obj interface{}) {
sc.runner.Run()
},
},
)
}
3 changes: 3 additions & 0 deletions source/ingressroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,6 @@ func parseContourLoadBalancerService(service string) (namespace, name string, er

return
}

func (sc *ingressRouteSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}
12 changes: 11 additions & 1 deletion source/multi_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ limitations under the License.

package source

import "sigs.k8s.io/external-dns/endpoint"
import (
"time"

"sigs.k8s.io/external-dns/endpoint"
)

// multiSource is a Source that merges the endpoints of its nested Sources.
type multiSource struct {
Expand All @@ -39,6 +43,12 @@ func (ms *multiSource) Endpoints() ([]*endpoint.Endpoint, error) {
return result, nil
}

func (ms *multiSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
for _, s := range ms.children {
s.AddEventHandler(handler, stopChan, minInterval)
}
}

// NewMultiSource creates a new multiSource.
func NewMultiSource(children []Source) Source {
return &multiSource{children: children}
Expand Down
Loading