From e523ea63a8b17510bf3d30dfafc74453e88e7235 Mon Sep 17 00:00:00 2001 From: James Houlahan Date: Thu, 8 Sep 2022 17:16:09 +0200 Subject: [PATCH] fix: Use QueuedChannel for serve errors The queued channel means we can push errors without blocking execution. --- builder.go | 3 ++- server.go | 9 +++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/builder.go b/builder.go index 230062ea..7d5806ab 100644 --- a/builder.go +++ b/builder.go @@ -8,6 +8,7 @@ import ( "github.com/ProtonMail/gluon/internal" "github.com/ProtonMail/gluon/internal/backend" + "github.com/ProtonMail/gluon/internal/queue" "github.com/ProtonMail/gluon/internal/session" "github.com/ProtonMail/gluon/profiling" "github.com/ProtonMail/gluon/reporter" @@ -58,7 +59,7 @@ func (builder *serverBuilder) build() (*Server, error) { dir: builder.dir, backend: backend, sessions: make(map[int]*session.Session), - serveErrCh: make(chan error), + serveErrCh: queue.NewQueuedChannel[error](1, 1), serveDoneCh: make(chan struct{}), inLogger: builder.inLogger, outLogger: builder.outLogger, diff --git a/server.go b/server.go index da4ddbaa..89a8d2ee 100644 --- a/server.go +++ b/server.go @@ -18,6 +18,7 @@ import ( "github.com/ProtonMail/gluon/events" "github.com/ProtonMail/gluon/internal" "github.com/ProtonMail/gluon/internal/backend" + "github.com/ProtonMail/gluon/internal/queue" "github.com/ProtonMail/gluon/internal/session" "github.com/ProtonMail/gluon/profiling" "github.com/ProtonMail/gluon/reporter" @@ -40,7 +41,7 @@ type Server struct { sessionsLock sync.RWMutex // serveErrCh collects errors encountered while serving. - serveErrCh chan error + serveErrCh *queue.QueuedChannel[error] // serveDoneCh is used to stop the server. serveDoneCh chan struct{} @@ -199,7 +200,7 @@ func (s *Server) serve(ctx context.Context, connCh <-chan net.Conn) { pprof.Do(ctx, labels, func(ctx context.Context) { if err := session.Serve(ctx); err != nil { if !errors.Is(err, net.ErrClosed) { - s.serveErrCh <- err + s.serveErrCh.Enqueue(err) } } }) @@ -210,7 +211,7 @@ func (s *Server) serve(ctx context.Context, connCh <-chan net.Conn) { // GetErrorCh returns the error channel. func (s *Server) GetErrorCh() <-chan error { - return s.serveErrCh + return s.serveErrCh.GetChannel() } func (s *Server) GetVersionInfo() internal.VersionInfo { @@ -246,7 +247,7 @@ func (s *Server) Close(ctx context.Context) error { } // Close the server error channel. - close(s.serveErrCh) + s.serveErrCh.Close() // Close any watchers. for _, watcher := range s.watchers {