Skip to content

Commit

Permalink
fix: Integrate goroutine leak detector and fix reports
Browse files Browse the repository at this point in the history
This patch integrates the goleak goroutine leak detector which is run
with each server instantiation in the unit tests. We also address the
currently detected goroutines that did not exit properly.

Added `Close()` to `Connector` to ensure the update channels can be
closed properly.

There is currently an issue with double invocation of `Connector.Close()`
which will be addressed with GODT-1647.

Finally we also ensure we wait that the goroutine launched with
`user.forward()` finishes execution before we return. Since we can now
properly wait on this, we no longer require a dedicated stop channel to
terminate the goroutine launched in `newUser()`.
  • Loading branch information
LBeernaertProton committed Jun 29, 2022
1 parent 77ef5f8 commit eb48fa7
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 30 deletions.
3 changes: 3 additions & 0 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,7 @@ type Connector interface {

// Resume resumes the stream of updates.
Resume()

// Close the connector will no longer be used and all resources should be closed/released.
Close(ctx context.Context) error
}
11 changes: 11 additions & 0 deletions connector/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ func (conn *Dummy) Sync(ctx context.Context) error {
return nil
}

func (conn *Dummy) Close(ctx context.Context) error {
//TODO: GODT-1647 fix double call to Close().
if conn.updateCh != nil {
close(conn.updateCh)
conn.updateCh = nil
conn.ticker.Stop()
}

return nil
}

func (conn *Dummy) GetLastRecordedIMAPID() imap.ID {
return conn.state.lastIMAPID
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/mattn/go-sqlite3 v1.14.10
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.1-0.20210427113832-6241f9ab9942
go.uber.org/goleak v1.1.12
golang.org/x/exp v0.0.0-20220518171630-0b5c67f07fdf
golang.org/x/text v0.3.7
google.golang.org/protobuf v1.27.1
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1-0.20210427113832-6241f9ab9942 h1:t0lM6y/M5IiUZyvbBTcngso8SZEZICH7is9B6g/obVU=
github.com/stretchr/testify v1.7.1-0.20210427113832-6241f9ab9942/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/vmihailenco/msgpack v3.3.3+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
Expand All @@ -92,6 +93,8 @@ github.com/zclconf/go-cty v1.2.0/go.mod h1:hOPWgoHbaTUnI5k4D2ld+GRpFJSCe6bCM7m1q
github.com/zclconf/go-cty v1.8.0 h1:s4AvqaeQzJIu3ndv4gVIhplVD0krU+bgrcLSVUnaWuA=
github.com/zclconf/go-cty v1.8.0/go.mod h1:vVKLxnk3puL4qRAv72AO+W99LUD4da90g3uUAzyuvAk=
github.com/zclconf/go-cty-debug v0.0.0-20191215020915-b22d67c1ba0b/go.mod h1:ZRKQfBXbGkpdV6QMzT3rU1kSTAnfu1dO8dPKjYprgj8=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand All @@ -104,6 +107,8 @@ golang.org/x/exp v0.0.0-20220518171630-0b5c67f07fdf h1:oXVg4h2qJDd9htKxb5SCpFBHL
golang.org/x/exp v0.0.0-20220518171630-0b5c67f07fdf/go.mod h1:yh0Ynu2b5ZUe3MQfp2nM0ecK7wsgouWTDN0FNeJuIys=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20200801112145-973feb4309de/go.mod h1:skQtrUTUwhdJvXM/2KKJzY8pDgNr9I/FOMqDVRPBUS4=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
Expand Down Expand Up @@ -142,10 +147,13 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.10 h1:QjFRCZxdOhBJ/UNgnBZLbNV13DlbnK0quyivTnXJM20=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
43 changes: 18 additions & 25 deletions internal/backend/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ type user struct {
statesLock sync.RWMutex
nextStateID int

updateStopCh chan struct{}
updateWG sync.WaitGroup
updateWG sync.WaitGroup
}

func newUser(ctx context.Context, userID string, client *ent.Client, remote *remote.User, store store.Store, delimiter string) (*user, error) {
Expand All @@ -35,13 +34,12 @@ func newUser(ctx context.Context, userID string, client *ent.Client, remote *rem
}

user := &user{
userID: userID,
remote: remote,
store: store,
delimiter: delimiter,
client: client,
states: make(map[int]*State),
updateStopCh: make(chan struct{}),
userID: userID,
remote: remote,
store: store,
delimiter: delimiter,
client: client,
states: make(map[int]*State),
}

if err := user.deleteAllMessagesMarkedDeleted(ctx); err != nil {
Expand All @@ -53,17 +51,13 @@ func newUser(ctx context.Context, userID string, client *ent.Client, remote *rem
go func() {
defer user.updateWG.Done()

for {
select {
case update := <-remote.GetUpdates():
if err := user.tx(context.Background(), func(tx *ent.Tx) error {
defer update.Done()
return user.apply(context.Background(), tx, update)
}); err != nil {
logrus.WithError(err).Errorf("Failed to apply update: %v", update)
}
case <-user.updateStopCh:
return
for update := range remote.GetUpdates() {
update := update
if err := user.tx(context.Background(), func(tx *ent.Tx) error {
defer update.Done()
return user.apply(context.Background(), tx, update)
}); err != nil {
logrus.WithError(err).Errorf("Failed to apply update: %v", update)
}
}
}()
Expand Down Expand Up @@ -110,14 +104,13 @@ func (user *user) tx(ctx context.Context, fn func(tx *ent.Tx) error) error {
func (user *user) close(ctx context.Context) error {
user.closeStates()

// Wait until the connector update go routine has finished.
close(user.updateStopCh)
user.updateWG.Wait()

if err := user.remote.CloseAndSerializeOperationQueue(); err != nil {
if err := user.remote.CloseAndSerializeOperationQueue(ctx); err != nil {
return fmt.Errorf("failed to close user remote: %w", err)
}

// Wait until the connector update go routine has finished.
user.updateWG.Wait()

if err := user.client.Close(); err != nil {
return fmt.Errorf("failed to close user client: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion internal/remote/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func (m *Manager) RemoveUser(ctx context.Context, userID string) error {
return ErrNoSuchUser
}

user.CloseAndFlushOperationQueue()
if err := user.CloseAndFlushOperationQueue(ctx); err != nil {
return err
}

path, err := m.getQueuePath(userID)
if err != nil {
Expand Down
36 changes: 34 additions & 2 deletions internal/remote/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package remote

import (
"bytes"
"context"
"encoding/gob"
"errors"
"fmt"
Expand Down Expand Up @@ -36,6 +37,8 @@ type User struct {

// processWG is used to ensure we wait until the process goroutine has finished executing after we close the queue.
processWG sync.WaitGroup
// forwardWG is used to ensure we wait until the forward() goroutine has finished executing.
forwardWG sync.WaitGroup
}

// newUser constructs a new user with the given (IMAP) credentials.
Expand All @@ -57,6 +60,8 @@ func newUser(userID, path string, conn connector.Connector) (*User, error) {
}

// send connector updates along to the mailserver.
user.forwardWG.Add(1)

go user.forward(conn.GetUpdates())

user.processWG.Add(1)
Expand All @@ -71,11 +76,23 @@ func (user *User) GetUpdates() <-chan imap.Update {
return user.updatesCh
}

func (user *User) CloseAndFlushOperationQueue() {
func (user *User) CloseAndFlushOperationQueue(ctx context.Context) error {
user.opQueue.closeQueue()

// Wait until any remaining operations popped by the process go routine finish executing
user.processWG.Wait()

if err := user.conn.Close(ctx); err != nil {
return err
}

//TODO: GODT-1647 fix double call to Close().
if user.updatesCh != nil {
user.forwardWG.Wait()
user.updatesCh = nil
}

return nil
}

func (user *User) FinishMailboxIDUpdate(tempID string) error {
Expand All @@ -91,7 +108,17 @@ func (user *User) FinishMessageIDUpdate(tempID string) error {
}

// CloseAndSerializeOperationQueue closes the remote user.
func (user *User) CloseAndSerializeOperationQueue() error {
func (user *User) CloseAndSerializeOperationQueue(ctx context.Context) error {
if err := user.conn.Close(ctx); err != nil {
return err
}

//TODO: GODT-1647 fix double call to Close().
if user.updatesCh != nil {
user.forwardWG.Wait()
user.updatesCh = nil
}

ops, err := user.opQueue.closeQueueAndRetrieveRemaining()
if err != nil {
return fmt.Errorf("failed to close queue: %w", err)
Expand Down Expand Up @@ -126,6 +153,11 @@ func (user *User) CloseAndSerializeOperationQueue() error {

// forward pulls updates off the stream and forwards them to the outgoing update channel.
func (user *User) forward(updateCh <-chan imap.Update) {
defer func() {
close(user.updatesCh)
user.forwardWG.Done()
}()

for update := range updateCh {
user.send(update)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ func (s *Session) logOutgoing(line string) {
}

func (s *Session) done(ctx context.Context) {
close(s.eventCh)

if s.state != nil {
if err := s.state.Close(ctx); err != nil {
logrus.WithError(err).Error("Failed to close state")
Expand Down
11 changes: 9 additions & 2 deletions tests/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

const defaultPeriod = time.Second
Expand Down Expand Up @@ -108,6 +109,12 @@ func runServer(tb testing.TB, creds []credentials, delim string, tests func(*tes

// runServerWithPaths initializes and starts the mailserver using a pathGenerator.
func runServerWithPaths(tb testing.TB, creds []credentials, delim string, pathGenerator pathGenerator, tests func(*testSession)) {
loggerIn := logrus.StandardLogger().WriterLevel(logrus.TraceLevel)
loggerOut := logrus.StandardLogger().WriterLevel(logrus.TraceLevel)

// Setup goroutine leak detector here so that it doesn't report the goroutines created by logrus.
defer goleak.VerifyNone(tb, goleak.IgnoreCurrent())

gluonPath := pathGenerator.GenerateBackendPath()
logrus.Tracef("Backend Path: %v", gluonPath)
server, err := gluon.New(
Expand All @@ -118,8 +125,8 @@ func runServerWithPaths(tb testing.TB, creds []credentials, delim string, pathGe
MinVersion: tls.VersionTLS13,
}),
gluon.WithLogger(
logrus.StandardLogger().WriterLevel(logrus.TraceLevel),
logrus.StandardLogger().WriterLevel(logrus.TraceLevel),
loggerIn,
loggerOut,
),
gluon.WithVersionInfo(TestServerVersionInfo.Version.Major, TestServerVersionInfo.Version.Minor, TestServerVersionInfo.Version.Patch,
TestServerVersionInfo.Name, TestServerVersionInfo.Vendor, TestServerVersionInfo.SupportURL),
Expand Down

0 comments on commit eb48fa7

Please sign in to comment.