From 19b8f7b4e7b06df9229e222094a460e73df21963 Mon Sep 17 00:00:00 2001 From: Leander Beernaert Date: Wed, 29 Mar 2023 14:23:38 +0200 Subject: [PATCH] fix(GODT-2526): Unlimited memory usage during fetch/search This bug was introduced when we switch for a queued channel to handle the client's responses. This had the unfortunate side effect that if the responses would arrive faster than they can be sent to client they can pile up. When this is combined with fetch queries that read the message literal, an unhealthy amount of RAM can be consumed by this. --- internal/session/handle.go | 15 ++------------- internal/session/session.go | 10 +++++++++- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/internal/session/handle.go b/internal/session/handle.go index 4bf1659f..6935fc03 100644 --- a/internal/session/handle.go +++ b/internal/session/handle.go @@ -8,7 +8,6 @@ import ( "github.com/ProtonMail/gluon/internal/response" "github.com/ProtonMail/gluon/internal/state" "github.com/ProtonMail/gluon/logging" - "github.com/ProtonMail/gluon/queue" ) func (s *Session) handleOther( @@ -16,17 +15,7 @@ func (s *Session) handleOther( tag string, cmd command.Payload, ) <-chan response.Response { - resCh := make(chan response.Response) - - outCh := queue.NewQueuedChannel[response.Response](0, 0) - - go func() { - defer outCh.Close() - - for res := range resCh { - outCh.Enqueue(res) - } - }() + resCh := make(chan response.Response, 8) s.handleWG.Go(func() { logging.DoAnnotated(state.NewStateContext(ctx, s.state), func(ctx context.Context) { @@ -45,7 +34,7 @@ func (s *Session) handleOther( }) }) - return outCh.GetChannel() + return resCh } // handleCommand returns a response instance if a command needs to force an exit of the client. diff --git a/internal/session/session.go b/internal/session/session.go index 19d649f1..b8c1611e 100644 --- a/internal/session/session.go +++ b/internal/session/session.go @@ -213,8 +213,16 @@ func (s *Session) serve(ctx context.Context) error { } default: - for res := range s.handleOther(withStartTime(ctx, time.Now()), res.command.Tag, cmd) { + respCh := s.handleOther(withStartTime(ctx, time.Now()), res.command.Tag, cmd) + for res := range respCh { if err := res.Send(s); err != nil { + go func() { + for range respCh { + // Consume all invalid input on error that is still being produced by the ongoing + // command. + } + }() + return fmt.Errorf("failed to send response to client: %w", err) } }