diff options
author | zeripath <art27@cantab.net> | 2021-05-15 23:46:13 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-15 23:46:13 +0200 |
commit | 8e32eeb5deed2da2dc2a648f62cba2613b566f71 (patch) | |
tree | 3a253b370978be5438e78c88c2adbdf479b8af5e /modules/eventsource | |
parent | Create a session on ReverseProxy and ensure that ReverseProxy users cannot ch... (diff) | |
download | forgejo-8e32eeb5deed2da2dc2a648f62cba2613b566f71.tar.xz forgejo-8e32eeb5deed2da2dc2a648f62cba2613b566f71.zip |
Hold the event source when there are no listeners (#15725)
* Hold the event source when there are no listeners
The event source does not need to run when there are no listeners. Therefore
pause it when there are none.
* add some more logging
Signed-off-by: Andrew Thornton <art27@cantab.net>
Diffstat (limited to '')
-rw-r--r-- | modules/eventsource/manager.go | 6 | ||||
-rw-r--r-- | modules/eventsource/manager_run.go | 29 |
2 files changed, 35 insertions, 0 deletions
diff --git a/modules/eventsource/manager.go b/modules/eventsource/manager.go index 212fe60569..812d673992 100644 --- a/modules/eventsource/manager.go +++ b/modules/eventsource/manager.go @@ -13,6 +13,7 @@ type Manager struct { mutex sync.Mutex messengers map[int64]*Messenger + connection chan struct{} } var manager *Manager @@ -20,6 +21,7 @@ var manager *Manager func init() { manager = &Manager{ messengers: make(map[int64]*Messenger), + connection: make(chan struct{}, 1), } } @@ -36,6 +38,10 @@ func (m *Manager) Register(uid int64) <-chan *Event { messenger = NewMessenger(uid) m.messengers[uid] = messenger } + select { + case m.connection <- struct{}{}: + default: + } m.mutex.Unlock() return messenger.Register() } diff --git a/modules/eventsource/manager_run.go b/modules/eventsource/manager_run.go index ccfe2e0709..60598ecb49 100644 --- a/modules/eventsource/manager_run.go +++ b/modules/eventsource/manager_run.go @@ -34,6 +34,35 @@ loop: timer.Stop() break loop case <-timer.C: + m.mutex.Lock() + connectionCount := len(m.messengers) + if connectionCount == 0 { + log.Trace("Event source has no listeners") + // empty the connection channel + select { + case <-m.connection: + default: + } + } + m.mutex.Unlock() + if connectionCount == 0 { + // No listeners so the source can be paused + log.Trace("Pausing the eventsource") + select { + case <-ctx.Done(): + break loop + case <-m.connection: + log.Trace("Connection detected - restarting the eventsource") + // OK we're back so lets reset the timer and start again + // We won't change the "then" time because there could be concurrency issues + select { + case <-timer.C: + default: + } + continue + } + } + now := timeutil.TimeStampNow().Add(-2) uidCounts, err := models.GetUIDsAndNotificationCounts(then, now) |