Skip to content

Commit

Permalink
Merge pull request #591 from liztio/recordprovider
Browse files Browse the repository at this point in the history
✨ Let users specify their own EventBroadcaster for the manager
  • Loading branch information
k8s-ci-robot committed Sep 11, 2019
2 parents 6993406 + 5f7374f commit 524b614
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
5 changes: 2 additions & 3 deletions pkg/internal/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ type provider struct {
}

// NewProvider create a new Provider instance.
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error) {
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) {
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to init clientSet: %v", err)
}

p := &provider{scheme: scheme, logger: logger}
p.eventBroadcaster = record.NewBroadcaster()
p := &provider{scheme: scheme, logger: logger, eventBroadcaster: broadcaster}
p.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
p.eventBroadcaster.StartEventWatcher(
func(e *corev1.Event) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/internal/recorder/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/internal/recorder"
)

var _ = Describe("recorder.Provider", func() {
Describe("NewProvider", func() {
It("should return a provider instance and a nil error.", func() {
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{})
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
Expect(provider).NotTo(BeNil())
Expect(err).NotTo(HaveOccurred())
})
Expand All @@ -36,13 +37,13 @@ var _ = Describe("recorder.Provider", func() {
// Invalid the config
cfg1 := *cfg
cfg1.ContentType = "invalid-type"
_, err := recorder.NewProvider(&cfg1, scheme.Scheme, tlog.NullLogger{})
_, err := recorder.NewProvider(&cfg1, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
Expect(err.Error()).To(ContainSubstring("failed to init clientSet"))
})
})
Describe("GetEventRecorder", func() {
It("should return a recorder instance.", func() {
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{})
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
Expect(err).NotTo(HaveOccurred())

recorder := provider.GetEventRecorderFor("test")
Expand Down
12 changes: 10 additions & 2 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,12 @@ type Options struct {
// use the cache for reads and the client for writes.
NewClient NewClientFunc

// EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API
// Use this to customize the event correlator and spam filter
EventBroadcaster record.EventBroadcaster

// Dependency injection for testing
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error)
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error)
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
newMetricsListener func(addr string) (net.Listener, error)
}
Expand Down Expand Up @@ -231,7 +235,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
// Create the recorder provider to inject event recorders for the components.
// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
// to the particular controller that it's being injected into, rather than a generic one like is here.
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"))
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"), options.EventBroadcaster)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -342,5 +346,9 @@ func setOptionsDefaults(options Options) Options {
options.RetryPeriod = &retryPeriod
}

if options.EventBroadcaster == nil {
options.EventBroadcaster = record.NewBroadcaster()
}

return options
}
3 changes: 2 additions & 1 deletion pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -111,7 +112,7 @@ var _ = Describe("manger.Manager", func() {

It("should return an error it can't create a recorder.Provider", func(done Done) {
m, err := New(cfg, Options{
newRecorderProvider: func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error) {
newRecorderProvider: func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) {
return nil, fmt.Errorf("expected error")
},
})
Expand Down

0 comments on commit 524b614

Please sign in to comment.