Skip to content

Commit

Permalink
fix(GODT-1757): Ensure States are properly cleaned up before removing…
Browse files Browse the repository at this point in the history
… user

Make sure the active States for a given user are properly cleaned up
before we close the user to ensure that any pending database operations
take place before the database is closed.
  • Loading branch information
LBeernaertProton committed Aug 30, 2022
1 parent 6347efd commit eca4f83
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 9 deletions.
3 changes: 0 additions & 3 deletions internal/backend/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type State struct {
ro bool

doneCh chan struct{}
stopCh chan struct{}

updatesQueue *queue.QueuedChannel[stateUpdate]

Expand Down Expand Up @@ -375,8 +374,6 @@ func (state *State) Done() <-chan struct{} {
}

func (state *State) Close(ctx context.Context) error {
defer close(state.stopCh)

return state.user.removeState(ctx, state.stateID)
}

Expand Down
7 changes: 7 additions & 0 deletions internal/backend/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type user struct {

updateWG sync.WaitGroup
updateQuitCh chan struct{}

// statesWG is
statesWG sync.WaitGroup
}

func newUser(ctx context.Context, userID string, db *DB, remote *remote.User, store store.Store, delimiter string) (*user, error) {
Expand Down Expand Up @@ -77,6 +80,10 @@ func newUser(ctx context.Context, userID string, db *DB, remote *remote.User, st
func (user *user) close(ctx context.Context) error {
user.closeStates()

// Ensure we wait until all states have been removed/closed by any active sessions otherwise we run into issues
// since we close the database in this function.
user.statesWG.Wait()

close(user.updateQuitCh)

// Wait until the connector update go routine has finished.
Expand Down
11 changes: 9 additions & 2 deletions internal/backend/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func (user *user) newState(metadataID remote.ConnMetadataID) (*State, error) {

user.states[user.nextStateID] = newState

user.statesWG.Add(1)

return newState, nil
}

Expand Down Expand Up @@ -64,6 +66,9 @@ func (user *user) removeState(ctx context.Context, stateID int) error {
return err
}

// After this point we need to notify the WaitGroup or we risk deadlocks.
defer user.statesWG.Done()

if err := user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
return DBDeleteMessages(ctx, tx, messageIDs...)
}); err != nil {
Expand Down Expand Up @@ -109,8 +114,10 @@ func (user *user) getStates() []*State {
}

func (user *user) closeStates() {
for _, state := range user.getStates() {
user.statesLock.RLock()
defer user.statesLock.RUnlock()

for _, state := range user.states {
close(state.doneCh)
<-state.stopCh
}
}
11 changes: 7 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type Server struct {

// versionInfo holds info about the Gluon version.
versionInfo internal.VersionInfo

connectionWG sync.WaitGroup
}

// New creates a new server with the given options.
Expand Down Expand Up @@ -152,19 +154,18 @@ func (s *Server) Serve(ctx context.Context, l net.Listener) chan error {
s.addListener(l)
defer s.removeListener(l)

var wg sync.WaitGroup
defer wg.Wait()
defer s.connectionWG.Wait()

for {
conn, err := l.Accept()
if err != nil {
return
}

wg.Add(1)
s.connectionWG.Add(1)

go func() {
defer wg.Done()
defer s.connectionWG.Done()
s.handleConn(ctx, conn, errCh)
}()
}
Expand All @@ -180,6 +181,8 @@ func (s *Server) Close(ctx context.Context) error {
s.removeListener(l)
}

s.connectionWG.Wait()

if err := s.backend.Close(ctx); err != nil {
return fmt.Errorf("failed to close backend: %w", err)
}
Expand Down

0 comments on commit eca4f83

Please sign in to comment.