Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hold the event source when there are no listeners #15725

Merged
merged 9 commits into from
May 15, 2021
6 changes: 6 additions & 0 deletions modules/eventsource/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ type Manager struct {
mutex sync.Mutex

messengers map[int64]*Messenger
connection chan struct{}
}

var manager *Manager

func init() {
manager = &Manager{
messengers: make(map[int64]*Messenger),
connection: make(chan struct{}, 1),
}
}

Expand All @@ -36,6 +38,10 @@ func (m *Manager) Register(uid int64) <-chan *Event {
messenger = NewMessenger(uid)
m.messengers[uid] = messenger
}
select {
case m.connection <- struct{}{}:
default:
}
m.mutex.Unlock()
return messenger.Register()
}
Expand Down
29 changes: 29 additions & 0 deletions modules/eventsource/manager_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,35 @@ loop:
timer.Stop()
break loop
case <-timer.C:
m.mutex.Lock()
connectionCount := len(m.messengers)
if connectionCount == 0 {
log.Trace("Event source has no listeners")
// empty the connection channel
select {
case <-m.connection:
default:
}
}
m.mutex.Unlock()
if connectionCount == 0 {
// No listeners so the source can be paused
log.Trace("Pausing the eventsource")
select {
case <-ctx.Done():
break loop
case <-m.connection:
log.Trace("Connection detected - restarting the eventsource")
// OK we're back so lets reset the timer and start again
// We won't change the "then" time because there could be concurrency issues
select {
case <-timer.C:
default:
}
continue
}
}

now := timeutil.TimeStampNow().Add(-2)

uidCounts, err := models.GetUIDsAndNotificationCounts(then, now)
Expand Down