Skip to content

Commit

Permalink
🐛 Event recorder should not send events when broadcaster is stopped
Browse files Browse the repository at this point in the history
This change solves a race condition when the manager is shutting down
and reconcilers are still in the process of sending events through the
recorder. If the shutdown timeout is hit too early, the event recorder
panics because we're still trying to send events through the recorder.

To avoid this issue, we introduce a RWMutex between the provider and
event recorder. When the broadcaster is shutdown, events are now
dropped.

Signed-off-by: Vince Prignano <vincepri@vmware.com>
  • Loading branch information
vincepri committed Mar 24, 2021
1 parent 197751d commit f89fe47
Showing 1 changed file with 24 additions and 3 deletions.
27 changes: 24 additions & 3 deletions pkg/internal/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithPr
// Provider is a recorder.Provider that records events to the k8s API server
// and to a logr Logger.
type Provider struct {
lock sync.RWMutex
stopped bool

// scheme to specify when creating a recorder
scheme *runtime.Scheme
// logger is the logger to use when logging diagnostic event info
Expand Down Expand Up @@ -70,7 +73,10 @@ func (p *Provider) Stop(shutdownCtx context.Context) {
// an invocation of getBroadcaster.
broadcaster := p.getBroadcaster()
if p.stopBroadcaster {
p.lock.Lock()
broadcaster.Shutdown()
p.stopped = true
p.lock.Unlock()
}
close(doneCh)
}()
Expand Down Expand Up @@ -144,13 +150,28 @@ func (l *lazyRecorder) ensureRecording() {

func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) {
l.ensureRecording()
l.rec.Event(object, eventtype, reason, message)

l.prov.lock.RLock()
if !l.prov.stopped {
l.rec.Event(object, eventtype, reason, message)
}
l.prov.lock.RUnlock()
}
func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
l.ensureRecording()
l.rec.Eventf(object, eventtype, reason, messageFmt, args...)

l.prov.lock.RLock()
if !l.prov.stopped {
l.rec.Eventf(object, eventtype, reason, messageFmt, args...)
}
l.prov.lock.RUnlock()
}
func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
l.ensureRecording()
l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)

l.prov.lock.RLock()
if !l.prov.stopped {
l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)
}
l.prov.lock.RUnlock()
}

0 comments on commit f89fe47

Please sign in to comment.