Skip to content

Commit

Permalink
Add --events flag to use informers to automatically trigger sync loop…
Browse files Browse the repository at this point in the history
… on adds/updates/deletes for supported ingress and service sources.
  • Loading branch information
jlamillan committed Nov 20, 2019
1 parent 92e3263 commit e5d8857
Show file tree
Hide file tree
Showing 20 changed files with 207 additions and 11 deletions.
42 changes: 42 additions & 0 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"reflect"
"testing"
"time"

"github.com/kubernetes-sigs/external-dns/endpoint"
"github.com/kubernetes-sigs/external-dns/internal/testutils"
Expand Down Expand Up @@ -151,3 +152,44 @@ func TestRunOnce(t *testing.T) {
// Validate that the mock source was called.
source.AssertExpectations(t)
}

// TestSourceEventHandler tests that the Controller can use a Source's registered handler as a callback.
func TestSourceEventHandler(t *testing.T) {
source := new(testutils.MockSource)

handlerCh := make(chan bool)
timeoutCh := make(chan bool, 1)
stopChan := make(chan struct{}, 1)

ctrl := &Controller{
Source: source,
Registry: nil,
Policy: &plan.SyncPolicy{},
}

// Define and register a simple handler that sends a message to a channel to show it was called.
handler := func() error {
handlerCh <- true
return nil
}
// Example of preventing handler from being called more than once every 5 seconds.
ctrl.Source.AddEventHandler(handler, stopChan, 5*time.Second)

// Send timeout message after 10 seconds to fail test if handler is not called.
go func() {
time.Sleep(10 * time.Second)
timeoutCh <- true
}()

// Wait until we either receive a message from handlerCh or timeoutCh channel after 10 seconds.
select {
case msg := <-handlerCh:
assert.True(t, msg)
case <-timeoutCh:
assert.Fail(t, "timed out waiting for event handler to be called")
}

close(stopChan)
close(handlerCh)
close(timeoutCh)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/client-go v10.0.0+incompatible
k8s.io/kube-openapi v0.0.0-20190401085232-94e1e7b7574c // indirect
k8s.io/kubernetes v1.14.1
)

replace (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,8 @@ k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH
k8s.io/kube-openapi v0.0.0-20190401085232-94e1e7b7574c h1:kJCzg2vGCzah5icgkKR7O1Dzn0NA2iGlym27sb0ZfGE=
k8s.io/kube-openapi v0.0.0-20190401085232-94e1e7b7574c/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/kubernetes v1.13.1/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/kubernetes v1.14.1 h1:I9F52h5sqVxBmoSsBlNQ0YygNcukDilkpGxUbJRoBoY=
k8s.io/kubernetes v1.14.1/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0=
k8s.io/utils v0.0.0-20190607212802-c55fbcfc754a/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
mvdan.cc/unparam v0.0.0-20190720180237-d51796306d8f/go.mod h1:4G1h5nDURzA3bwVMZIVpwbkw+04kSxk3rAtzlimaUJw=
Expand Down
22 changes: 22 additions & 0 deletions internal/testutils/mock_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package testutils

import (
"time"

"github.com/stretchr/testify/mock"

"github.com/kubernetes-sigs/external-dns/endpoint"
Expand All @@ -38,3 +40,23 @@ func (m *MockSource) Endpoints() ([]*endpoint.Endpoint, error) {

return endpoints.([]*endpoint.Endpoint), args.Error(1)
}

// AddEventHandler adds an event handler function that's called when sources that support such a thing have changed.
func (m *MockSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
// Execute callback handler no more than once per minInterval, until a message on stopChan is received.
go func() {
var lastCallbackTime time.Time
for {
select {
case <-stopChan:
return
default:
now := time.Now()
if now.After(lastCallbackTime.Add(minInterval)) {
handler()
lastCallbackTime = time.Now()
}
}
}
}()
}
20 changes: 17 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -90,9 +91,15 @@ func main() {

// Lookup all the selected sources by names and pass them the desired configuration.
sources, err := source.ByNames(&source.SingletonClientGenerator{
KubeConfig: cfg.KubeConfig,
KubeMaster: cfg.Master,
RequestTimeout: cfg.RequestTimeout,
KubeConfig: cfg.KubeConfig,
KubeMaster: cfg.Master,
// If update events are enabled, disable timeout.
RequestTimeout: func() time.Duration {
if cfg.UpdateEvents {
return 0
}
return cfg.RequestTimeout
}(),
}, cfg.Sources, sourceCfg)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -265,6 +272,13 @@ func main() {
Interval: cfg.Interval,
}

if cfg.UpdateEvents {
// Add RunOnce as the handler function that will be called when ingress/service sources have changed.
// Note that k8s Informers will perform an initial list operation, which results in the handler
// function initially being called for every Service/Ingress that exists limted by minInterval.
ctrl.Source.AddEventHandler(ctrl.RunOnce, stopChan, 1*time.Minute)
}

if cfg.Once {
err := ctrl.RunOnce()
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/externaldns/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type Config struct {
Interval time.Duration
Once bool
DryRun bool
UpdateEvents bool
LogFormat string
MetricsAddress string
LogLevel string
Expand Down Expand Up @@ -192,6 +193,7 @@ var defaultConfig = &Config{
Interval: time.Minute,
Once: false,
DryRun: false,
UpdateEvents: false,
LogFormat: "text",
MetricsAddress: ":7979",
LogLevel: logrus.InfoLevel.String(),
Expand Down Expand Up @@ -371,6 +373,7 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("interval", "The interval between two consecutive synchronizations in duration format (default: 1m)").Default(defaultConfig.Interval.String()).DurationVar(&cfg.Interval)
app.Flag("once", "When enabled, exits the synchronization loop after the first iteration (default: disabled)").BoolVar(&cfg.Once)
app.Flag("dry-run", "When enabled, prints DNS record changes rather than actually performing them (default: disabled)").BoolVar(&cfg.DryRun)
app.Flag("events", "When enabled, in addition to running every interval, the reconciliation loop will get triggered when supported sources change (default: disabled)").BoolVar(&cfg.UpdateEvents)

// Miscellaneous flags
app.Flag("log-format", "The format in which log messages are printed (default: text, options: text, json)").Default(defaultConfig.LogFormat).EnumVar(&cfg.LogFormat, "text", "json")
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/externaldns/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var (
Interval: time.Minute,
Once: false,
DryRun: false,
UpdateEvents: false,
LogFormat: "text",
MetricsAddress: ":7979",
LogLevel: logrus.InfoLevel.String(),
Expand Down Expand Up @@ -151,6 +152,7 @@ var (
Interval: 10 * time.Minute,
Once: true,
DryRun: true,
UpdateEvents: true,
LogFormat: "json",
MetricsAddress: "127.0.0.1:9099",
LogLevel: logrus.DebugLevel.String(),
Expand Down Expand Up @@ -312,6 +314,7 @@ func TestParseFlags(t *testing.T) {
"--interval=10m",
"--once",
"--dry-run",
"--events",
"--log-format=json",
"--metrics-address=127.0.0.1:9099",
"--log-level=debug",
Expand Down Expand Up @@ -390,6 +393,7 @@ func TestParseFlags(t *testing.T) {
"EXTERNAL_DNS_INTERVAL": "10m",
"EXTERNAL_DNS_ONCE": "1",
"EXTERNAL_DNS_DRY_RUN": "1",
"EXTERNAL_DNS_EVENTS": "1",
"EXTERNAL_DNS_LOG_FORMAT": "json",
"EXTERNAL_DNS_METRICS_ADDRESS": "127.0.0.1:9099",
"EXTERNAL_DNS_LOG_LEVEL": "debug",
Expand Down
4 changes: 4 additions & 0 deletions source/cloudfoundry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package source

import (
"net/url"
"time"

cfclient "github.com/cloudfoundry-community/go-cfclient"
"github.com/kubernetes-sigs/external-dns/endpoint"
Expand All @@ -35,6 +36,9 @@ func NewCloudFoundrySource(cfClient *cfclient.Client) (Source, error) {
}, nil
}

func (rs *cloudfoundrySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}

// Endpoints returns endpoint objects
func (rs *cloudfoundrySource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := []*endpoint.Endpoint{}
Expand Down
3 changes: 3 additions & 0 deletions source/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,6 @@ func (cs *connectorSource) Endpoints() ([]*endpoint.Endpoint, error) {

return endpoints, nil
}

func (cs *connectorSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}
4 changes: 4 additions & 0 deletions source/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/kubernetes-sigs/external-dns/endpoint"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -107,6 +108,9 @@ func NewCRDSource(crdClient rest.Interface, namespace, kind string, scheme *runt
}, nil
}

func (cs *crdSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}

// Endpoints returns endpoint objects.
func (cs *crdSource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := []*endpoint.Endpoint{}
Expand Down
6 changes: 6 additions & 0 deletions source/dedup_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package source

import (
"time"

log "github.com/sirupsen/logrus"

"github.com/kubernetes-sigs/external-dns/endpoint"
Expand Down Expand Up @@ -56,3 +58,7 @@ func (ms *dedupSource) Endpoints() ([]*endpoint.Endpoint, error) {

return result, nil
}

func (ms *dedupSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
ms.source.AddEventHandler(handler, stopChan, minInterval)
}
8 changes: 7 additions & 1 deletion source/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ limitations under the License.

package source

import "github.com/kubernetes-sigs/external-dns/endpoint"
import (
"github.com/kubernetes-sigs/external-dns/endpoint"
"time"
)

// emptySource is a Source that returns no endpoints.
type emptySource struct{}

func (e *emptySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}

// Endpoints collects endpoints of all nested Sources and returns them in a single slice.
func (e *emptySource) Endpoints() ([]*endpoint.Endpoint, error) {
return []*endpoint.Endpoint{}, nil
Expand Down
3 changes: 3 additions & 0 deletions source/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func NewFakeSource(fqdnTemplate string) (Source, error) {
}, nil
}

func (sc *fakeSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}

// Endpoints returns endpoint objects.
func (sc *fakeSource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := make([]*endpoint.Endpoint, 10)
Expand Down
4 changes: 4 additions & 0 deletions source/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sort"
"strings"
"text/template"
"time"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -150,6 +151,9 @@ func (sc *gatewaySource) Endpoints() ([]*endpoint.Endpoint, error) {
return endpoints, nil
}

func (sc *gatewaySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}

func (sc *gatewaySource) endpointsFromTemplate(config *istiomodel.Config) ([]*endpoint.Endpoint, error) {
// Process the whole template string
var buf bytes.Buffer
Expand Down
33 changes: 32 additions & 1 deletion source/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
extinformers "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/util/async"
)

const (
Expand All @@ -56,6 +57,7 @@ type ingressSource struct {
combineFQDNAnnotation bool
ignoreHostnameAnnotation bool
ingressInformer extinformers.IngressInformer
runner *async.BoundedFrequencyRunner
}

// NewIngressSource creates a new ingressSource with the given config.
Expand Down Expand Up @@ -91,7 +93,7 @@ func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilt

// wait for the local cache to be populated.
err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
return ingressInformer.Informer().HasSynced() == true, nil
return ingressInformer.Informer().HasSynced(), nil
})
if err != nil {
return nil, fmt.Errorf("failed to sync cache: %v", err)
Expand Down Expand Up @@ -303,3 +305,32 @@ func targetsFromIngressStatus(status v1beta1.IngressStatus) endpoint.Targets {

return targets
}

func (sc *ingressSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
// Add custom resource event handler
log.Debug("Adding (bounded) event handler for ingress")

maxInterval := 24 * time.Hour // handler will be called if it has not run in 24 hours
burst := 2 // allow up to two handler burst calls
log.Debugf("Adding handler to BoundedFrequencyRunner with minInterval: %v, syncPeriod: %v, bursts: %d",
minInterval, maxInterval, burst)
sc.runner = async.NewBoundedFrequencyRunner("ingress-handler", func() {
_ = handler()
}, minInterval, maxInterval, burst)
go sc.runner.Loop(stopChan)

// run the handler function as soon as the BoundedFrequencyRunner will allow when an update occurs
sc.ingressInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
sc.runner.Run()
},
UpdateFunc: func(old interface{}, new interface{}) {
sc.runner.Run()
},
DeleteFunc: func(obj interface{}) {
sc.runner.Run()
},
},
)
}
3 changes: 3 additions & 0 deletions source/ingressroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,6 @@ func parseContourLoadBalancerService(service string) (namespace, name string, er

return
}

func (sc *ingressRouteSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}
11 changes: 10 additions & 1 deletion source/multi_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ limitations under the License.

package source

import "github.com/kubernetes-sigs/external-dns/endpoint"
import (
"github.com/kubernetes-sigs/external-dns/endpoint"
"time"
)

// multiSource is a Source that merges the endpoints of its nested Sources.
type multiSource struct {
Expand All @@ -39,6 +42,12 @@ func (ms *multiSource) Endpoints() ([]*endpoint.Endpoint, error) {
return result, nil
}

func (ms *multiSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
for _, s := range ms.children {
s.AddEventHandler(handler, stopChan, minInterval)
}
}

// NewMultiSource creates a new multiSource.
func NewMultiSource(children []Source) Source {
return &multiSource{children: children}
Expand Down
Loading

0 comments on commit e5d8857

Please sign in to comment.