Skip to content

Commit

Permalink
fix(GODT-1757): Flag updates can only be applied through responders
Browse files Browse the repository at this point in the history
We should only apply flag updates through responders as it possible to
run into a sequence of events where an exist responder hasn't been
processed yet. Attempting to apply the flag changes during a state
update will result in an error or no op.

The error was not reproducible at the moment since the state filter
would fail to queue the update as the message did not exist yet.

Finally the whole issue is currently masked since we get the new
message for the snap shot directly from the database. I ran into this
issue with local changes where this was no longer the case.
  • Loading branch information
LBeernaertProton committed Sep 7, 2022
1 parent 1cf7ad5 commit 81b1a3a
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 85 deletions.
41 changes: 33 additions & 8 deletions internal/state/responders.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,30 @@ func (u *expunge) String() string {
)
}

const (
FetchFlagOpAdd = iota
FetchFlagOpRem
FetchFlagOpSet
)

type fetch struct {
messageID imap.InternalMessageID
flags imap.FlagSet

asUID bool
asSilent bool
fetchFlagOp int
asUID bool
asSilent bool
cameFromDifferentMailbox bool
}

func NewFetch(messageID imap.InternalMessageID, flags imap.FlagSet, asUID, asSilent bool) *fetch {
func NewFetch(messageID imap.InternalMessageID, flags imap.FlagSet, asUID, asSilent, cameFromDifferentMailbox bool, fetchFlagOp int) *fetch {
return &fetch{
messageID: messageID,
flags: flags,
asUID: asUID,
asSilent: asSilent,
messageID: messageID,
flags: flags,
asUID: asUID,
asSilent: asSilent,
fetchFlagOp: fetchFlagOp,
cameFromDifferentMailbox: cameFromDifferentMailbox,
}
}

Expand All @@ -181,7 +191,22 @@ func (u *fetch) handle(ctx context.Context, tx *ent.Tx, snap *snapshot) ([]respo
}

// Set the new flags as per the fetch response (recent flag is preserved).
if err := snap.setMessageFlags(u.messageID, u.flags); err != nil {
var newMessageFlags imap.FlagSet

switch u.fetchFlagOp {
case FetchFlagOpAdd:
newMessageFlags = curFlags.AddFlagSet(u.flags)
case FetchFlagOpRem:
newMessageFlags = curFlags.RemoveFlagSet(u.flags)
case FetchFlagOpSet:
newMessageFlags = u.flags
}

if u.cameFromDifferentMailbox {
newMessageFlags = newMessageFlags.Set(imap.FlagDeleted, curFlags.Contains(imap.FlagDeleted))
}

if err := snap.setMessageFlags(u.messageID, newMessageFlags); err != nil {
return nil, err
}

Expand Down
105 changes: 46 additions & 59 deletions internal/state/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,33 @@ type Update interface {
}

type messageFlagsAddedStateUpdate struct {
AnyMessageIDStateFilter
flags imap.FlagSet
mboxID ids.MailboxIDPair
stateID int
AllStateFilter
messageIDs []imap.InternalMessageID
flags imap.FlagSet
mboxID ids.MailboxIDPair
stateID int
}

func newMessageFlagsAddedStateUpdate(flags imap.FlagSet, mboxID ids.MailboxIDPair, messageIDs []imap.InternalMessageID, stateID int) Update {
return &messageFlagsAddedStateUpdate{
flags: flags,
mboxID: mboxID,
AnyMessageIDStateFilter: AnyMessageIDStateFilter{MessageIDs: messageIDs},
stateID: stateID,
flags: flags,
mboxID: mboxID,
messageIDs: messageIDs,
stateID: stateID,
}
}

func (u *messageFlagsAddedStateUpdate) Apply(ctx context.Context, tx *ent.Tx, s *State) error {
for _, messageID := range u.MessageIDs {
snapFlags, err := s.snap.getMessageFlags(messageID)
if err != nil {
return err
}

newFlags := snapFlags.AddFlagSet(u.flags)

if s.snap.mboxID != u.mboxID {
newFlags = newFlags.Set(imap.FlagDeleted, snapFlags.Contains(imap.FlagDeleted))
}
for _, messageID := range u.messageIDs {
newFlags := u.flags

if err := s.PushResponder(ctx, tx, NewFetch(
messageID,
newFlags,
contexts.IsUID(ctx),
s.StateID == u.stateID && contexts.IsSilent(ctx),
s.snap.mboxID != u.mboxID,
FetchFlagOpAdd,
)); err != nil {
return err
}
Expand All @@ -66,7 +60,7 @@ func (u *messageFlagsAddedStateUpdate) Apply(ctx context.Context, tx *ent.Tx, s
func (u *messageFlagsAddedStateUpdate) String() string {
return fmt.Sprintf("MessagFlagsAddedStateUpdate: mbox = %v messages = %v flags = %v",
u.mboxID.InternalID.ShortID(),
xslices.Map(u.MessageIDs, func(id imap.InternalMessageID) string {
xslices.Map(u.messageIDs, func(id imap.InternalMessageID) string {
return id.ShortID()
}),
u.flags)
Expand Down Expand Up @@ -114,39 +108,33 @@ func (state *State) applyMessageFlagsAdded(ctx context.Context, tx *ent.Tx, mess
}

type messageFlagsRemovedStateUpdate struct {
AnyMessageIDStateFilter
flags imap.FlagSet
mboxID ids.MailboxIDPair
stateID int
AllStateFilter
messageIDs []imap.InternalMessageID
flags imap.FlagSet
mboxID ids.MailboxIDPair
stateID int
}

func NewMessageFlagsRemovedStateUpdate(flags imap.FlagSet, mboxID ids.MailboxIDPair, messageIDs []imap.InternalMessageID, stateID int) Update {
return &messageFlagsRemovedStateUpdate{
flags: flags,
mboxID: mboxID,
AnyMessageIDStateFilter: AnyMessageIDStateFilter{MessageIDs: messageIDs},
stateID: stateID,
flags: flags,
mboxID: mboxID,
messageIDs: messageIDs,
stateID: stateID,
}
}

func (u *messageFlagsRemovedStateUpdate) Apply(ctx context.Context, tx *ent.Tx, s *State) error {
for _, messageID := range u.MessageIDs {
snapFlags, err := s.snap.getMessageFlags(messageID)
if err != nil {
return err
}

newFlags := snapFlags.RemoveFlagSet(u.flags)

if s.snap.mboxID != u.mboxID {
newFlags = newFlags.Set(imap.FlagDeleted, snapFlags.Contains(imap.FlagDeleted))
}
for _, messageID := range u.messageIDs {
newFlags := u.flags

if err := s.PushResponder(ctx, tx, NewFetch(
messageID,
newFlags,
contexts.IsUID(ctx),
s.StateID == u.stateID && contexts.IsSilent(ctx),
s.snap.mboxID != u.mboxID,
FetchFlagOpRem,
)); err != nil {
return err
}
Expand All @@ -158,7 +146,7 @@ func (u *messageFlagsRemovedStateUpdate) Apply(ctx context.Context, tx *ent.Tx,
func (u *messageFlagsRemovedStateUpdate) String() string {
return fmt.Sprintf("MessagFlagsRemovedStateUpdate: mbox = %v messages = %v flags = %v",
u.mboxID.InternalID,
xslices.Map(u.MessageIDs, func(id imap.InternalMessageID) string {
xslices.Map(u.messageIDs, func(id imap.InternalMessageID) string {
return id.ShortID()
}),
u.flags)
Expand Down Expand Up @@ -212,39 +200,33 @@ func (state *State) applyMessageFlagsRemoved(ctx context.Context, tx *ent.Tx, me
}

type messageFlagsSetStateUpdate struct {
AnyMessageIDStateFilter
flags imap.FlagSet
mboxID ids.MailboxIDPair
stateID int
AllStateFilter
messageIDs []imap.InternalMessageID
flags imap.FlagSet
mboxID ids.MailboxIDPair
stateID int
}

func NewMessageFlagsSetStateUpdate(flags imap.FlagSet, mboxID ids.MailboxIDPair, messageIDs []imap.InternalMessageID, stateID int) Update {
return &messageFlagsSetStateUpdate{
flags: flags,
mboxID: mboxID,
AnyMessageIDStateFilter: AnyMessageIDStateFilter{MessageIDs: messageIDs},
stateID: stateID,
flags: flags,
mboxID: mboxID,
messageIDs: messageIDs,
stateID: stateID,
}
}

func (u *messageFlagsSetStateUpdate) Apply(ctx context.Context, tx *ent.Tx, state *State) error {
for _, messageID := range u.MessageIDs {
for _, messageID := range u.messageIDs {
newFlags := u.flags

if state.snap.mboxID != u.mboxID {
snapFlags, err := state.snap.getMessageFlags(messageID)
if err != nil {
return err
}

newFlags = newFlags.Set(imap.FlagDeleted, snapFlags.Contains(imap.FlagDeleted))
}

if err := state.PushResponder(ctx, tx, NewFetch(
messageID,
newFlags,
contexts.IsUID(ctx),
state.StateID == u.stateID && contexts.IsSilent(ctx),
state.snap.mboxID != u.mboxID,
FetchFlagOpSet,
)); err != nil {
return err
}
Expand All @@ -254,7 +236,12 @@ func (u *messageFlagsSetStateUpdate) Apply(ctx context.Context, tx *ent.Tx, stat
}

func (u *messageFlagsSetStateUpdate) String() string {
return fmt.Sprintf("MessagFlagsSetStateUpdate: mbox = %v messages = %v flags=%v", u.mboxID.InternalID.ShortID(), u.MessageIDs, u.flags)
return fmt.Sprintf("MessagFlagsSetStateUpdate: mbox = %v messages = %v flags=%v",
u.mboxID.InternalID.ShortID(),
xslices.Map(u.messageIDs, func(id imap.InternalMessageID) string {
return id.ShortID()
}),
u.flags)
}

// applyMessageFlagsSet sets the flags of the given messages.
Expand Down
20 changes: 2 additions & 18 deletions internal/state/updates_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package state

import (
"context"
"errors"

"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/internal/contexts"
"github.com/ProtonMail/gluon/internal/db/ent"
Expand All @@ -23,12 +21,7 @@ func NewRemoteAddMessageFlagsStateUpdate(messageID imap.InternalMessageID, flag
}

func (u *RemoteAddMessageFlagsStateUpdate) Apply(ctx context.Context, tx *ent.Tx, s *State) error {
snapFlags, err := s.snap.getMessageFlags(u.MessageID)
if err != nil {
return err
}

return s.PushResponder(ctx, tx, NewFetch(u.MessageID, snapFlags.Add(u.flag), contexts.IsUID(ctx), contexts.IsSilent(ctx)))
return s.PushResponder(ctx, tx, NewFetch(u.MessageID, imap.NewFlagSet(u.flag), contexts.IsUID(ctx), contexts.IsSilent(ctx), false, FetchFlagOpAdd))
}

type RemoteRemoveMessageFlagsStateUpdate struct {
Expand All @@ -44,16 +37,7 @@ func NewRemoteRemoveMessageFlagsStateUpdate(messageID imap.InternalMessageID, fl
}

func (u *RemoteRemoveMessageFlagsStateUpdate) Apply(ctx context.Context, tx *ent.Tx, s *State) error {
snapFlags, err := s.snap.getMessageFlags(u.MessageID)
if err != nil {
if errors.Is(err, ErrNoSuchMessage) {
return nil
}

return err
}

return s.PushResponder(ctx, tx, NewFetch(u.MessageID, snapFlags.Remove(u.flag), contexts.IsUID(ctx), contexts.IsSilent(ctx)))
return s.PushResponder(ctx, tx, NewFetch(u.MessageID, imap.NewFlagSet(u.flag), contexts.IsUID(ctx), contexts.IsSilent(ctx), false, FetchFlagOpRem))
}

type RemoteMessageDeletedStateUpdate struct {
Expand Down

0 comments on commit 81b1a3a

Please sign in to comment.