Skip to content

Commit

Permalink
fix: Connector Message Create event handling
Browse files Browse the repository at this point in the history
The previous code did not properly handle message de-duplication code
resulting in scenarios where duplicate message entries could be created.
  • Loading branch information
LBeernaertProton authored and jameshoulahan committed Sep 14, 2022
1 parent b1e5abe commit 812a272
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 158 deletions.
155 changes: 83 additions & 72 deletions internal/backend/connector_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,96 +143,95 @@ func (user *user) applyMailboxIDChanged(ctx context.Context, update *imap.Mailbo

// applyMessagesCreated applies a MessagesCreated update.
func (user *user) applyMessagesCreated(ctx context.Context, update *imap.MessagesCreated) error {
var updates []*imap.MessageCreated
// collect all unique messages to create
messagesToCreate := make([]*db.CreateMessageReq, 0, len(update.Messages))
messagesToCreateFilter := make(map[imap.MessageID]imap.InternalMessageID, len(update.Messages)/2)
messageForMBox := make(map[imap.InternalMailboxID][]imap.InternalMessageID)
mboxInternalIDMap := make(map[imap.LabelID]imap.InternalMailboxID)

if err := user.db.Read(ctx, func(ctx context.Context, client *ent.Client) error {
for _, update := range update.Messages {
if exists, err := db.MessageExistsWithRemoteID(ctx, client, update.Message.ID); err != nil {
return err
} else if !exists {
updates = append(updates, update)
}
}

return nil
}); err != nil {
return err
}

remoteMessageRequestsMap := make(map[imap.MessageID]*db.CreateMessageReq, len(updates))

const maxUpdateChunk = 1000

chunkedUpdates := xslices.Chunk(updates, maxUpdateChunk)

for _, chunk := range chunkedUpdates {
messagesForMailboxes := make(map[imap.InternalMailboxID][]*db.CreateMessageReq)

if err := db.WriteAndStore(ctx, user.db, user.store, func(ctx context.Context, tx *ent.Tx, storeTx store.Transaction) error {
for _, update := range chunk {

var request *db.CreateMessageReq

// Do not reprocess the message if we have previously processed it.
if req, ok := remoteMessageRequestsMap[update.Message.ID]; ok {
request = req
for _, message := range update.Messages {
internalID, ok := messagesToCreateFilter[message.Message.ID]
if !ok {
_, err := db.GetMessageIDFromRemoteID(ctx, client, message.Message.ID)
if ent.IsNotFound(err) {
internalID = imap.InternalMessageID(uuid.NewString())
} else {
return err
}

internalID := uuid.NewString()

literal, err := rfc822.SetHeaderValue(update.Literal, ids.InternalIDKey, internalID)
if err != nil {
return fmt.Errorf("failed to set internal ID: %w", err)
}

if err := storeTx.Set(imap.InternalMessageID(internalID), literal); err != nil {
return fmt.Errorf("failed to store message literal: %w", err)
}

request = &db.CreateMessageReq{
Message: update.Message,
Literal: literal,
Body: update.ParsedMessage.Body,
Structure: update.ParsedMessage.Structure,
Envelope: update.ParsedMessage.Envelope,
InternalID: imap.InternalMessageID(internalID),
}
literal, err := rfc822.SetHeaderValue(message.Literal, ids.InternalIDKey, string(internalID))
if err != nil {
return fmt.Errorf("failed to set internal ID: %w", err)
}

remoteMessageRequestsMap[update.Message.ID] = request
request := &db.CreateMessageReq{
Message: message.Message,
Literal: literal,
Body: message.ParsedMessage.Body,
Structure: message.ParsedMessage.Structure,
Envelope: message.ParsedMessage.Envelope,
InternalID: internalID,
}

for _, mbox := range update.MailboxIDs {
internalMailboxID, err := db.GetMailboxIDWithRemoteID(ctx, tx.Client(), mbox)
messagesToCreate = append(messagesToCreate, request)
messagesToCreateFilter[message.Message.ID] = internalID
}

for _, mboxID := range message.MailboxIDs {
v, ok := mboxInternalIDMap[mboxID]
if !ok {
internalMBoxID, err := db.GetMailboxIDWithRemoteID(ctx, client, mboxID)
if err != nil {
return err
}

existingRequests := messagesForMailboxes[internalMailboxID]

var alreadyPresent bool

for _, e := range existingRequests {
if e.Message.ID == request.Message.ID {
alreadyPresent = true
break
}
}
v = internalMBoxID
mboxInternalIDMap[mboxID] = v
}

if alreadyPresent {
continue
}
messageList, ok := messageForMBox[v]
if !ok {
messageList = []imap.InternalMessageID{}
messageForMBox[v] = messageList
}

messagesForMailboxes[internalMailboxID] = append(messagesForMailboxes[internalMailboxID], request)
if !slices.Contains(messageList, internalID) {
messageList = append(messageList, internalID)
messageForMBox[v] = messageList
}
}
}
return nil
}); err != nil {
return err
}

for mbox, request := range messagesForMailboxes {
result, err := db.CreateAndAddMessagesToMailbox(ctx, tx, mbox, request)
if len(messagesToCreate) == 0 && len(messageForMBox) == 0 {
return nil
}

const maxUpdateChunk = 1000
// We sadly have to split this up into two separate transactions where we create the messages and one where we
// assign them to the mailbox. There's an upper limit to the number of items badger can track in one transaction.
// This way we can keep the database consistent.
for _, chunk := range xslices.Chunk(messagesToCreate, maxUpdateChunk) {
if err := db.WriteAndStore(ctx, user.db, user.store, func(ctx context.Context, tx *ent.Tx, storeTx store.Transaction) error {
// Create messages in the store
for _, msg := range chunk {
literalWithHeader, err := rfc822.SetHeaderValue(msg.Literal, ids.InternalIDKey, string(msg.InternalID))
if err != nil {
return err
return fmt.Errorf("failed to set internal ID: %w", err)
}

if err := storeTx.Set(msg.InternalID, literalWithHeader); err != nil {
return fmt.Errorf("failed to store message literal: %w", err)
}
}

user.queueStateUpdate(state.NewExistsStateUpdate(mbox, result, nil))
// Create message in the database
if _, err := db.CreateMessages(ctx, tx, chunk...); err != nil {
return err
}

return nil
Expand All @@ -241,7 +240,19 @@ func (user *user) applyMessagesCreated(ctx context.Context, update *imap.Message
}
}

return nil
return user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
// Assign all the messages to the mailbox
for mboxID, msgList := range messageForMBox {
for _, chunk := range xslices.Chunk(msgList, maxUpdateChunk) {
if _, err := user.applyMessagesAddedToMailbox(ctx, tx, mboxID, chunk); err != nil {
return err
}
}

}

return nil
})
}

// applyMessageLabelsUpdated applies a MessageLabelsUpdated update.
Expand Down
90 changes: 4 additions & 86 deletions internal/db/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,87 +150,6 @@ type CreateAndAddMessagesResult struct {
MessageID ids.MessageIDPair
}

func CreateAndAddMessagesToMailbox(ctx context.Context, tx *ent.Tx, mboxID imap.InternalMailboxID, requests []*CreateMessageReq) ([]CreateAndAddMessagesResult, error) {
mbox, err := tx.Mailbox.Query().Where(mailbox.ID(mboxID)).Select(mailbox.FieldID, mailbox.FieldUIDNext).Only(ctx)
if err != nil {
return nil, err
}

msgBuilders := make([]*ent.MessageCreate, 0, len(requests))
flags := make([][]*ent.MessageFlag, 0, len(requests))

for _, request := range requests {
builders := xslices.Map(request.Message.Flags.ToSlice(), func(flag string) *ent.MessageFlagCreate {
return tx.MessageFlag.Create().SetValue(flag)
})

entFlags, err := tx.MessageFlag.CreateBulk(builders...).Save(ctx)
if err != nil {
return nil, err
}

flags = append(flags, entFlags)

msgCreate := tx.Message.Create().
SetID(request.InternalID).
SetDate(request.Message.Date).
SetBody(request.Body).
SetBodyStructure(request.Structure).
SetEnvelope(request.Envelope).
SetSize(len(request.Literal)).
AddFlags(entFlags...)

if len(request.Message.ID) != 0 {
msgCreate = msgCreate.SetRemoteID(request.Message.ID)
}

msgBuilders = append(msgBuilders, msgCreate)
}

messages, err := tx.Message.CreateBulk(msgBuilders...).Save(ctx)
if err != nil {
return nil, err
}

uidBuilders := make([]*ent.UIDCreate, 0, len(requests))

for i, message := range messages {
uidBuilders = append(uidBuilders, tx.UID.Create().
SetMailboxID(mbox.ID).
SetMessageID(message.ID).
SetUID(mbox.UIDNext.Add(uint32(i))),
)
}

uids, err := tx.UID.CreateBulk(uidBuilders...).Save(ctx)
if err != nil {
return nil, err
}

if err := BumpMailboxUIDNext(ctx, tx, mbox, len(requests)); err != nil {
return nil, err
}

result := make([]CreateAndAddMessagesResult, 0, len(requests))

for i := 0; i < len(requests); i++ {
if uids[i].UID != mbox.UIDNext.Add(uint32(i)) {
panic("Invalid UID ")
}

result = append(result, CreateAndAddMessagesResult{
MessageID: ids.MessageIDPair{
InternalID: messages[i].ID,
RemoteID: messages[i].RemoteID,
},
UID: uids[i].UID,
Flags: NewFlagSet(uids[i], flags[i]),
})
}

return result, err
}

func BumpMailboxUIDsForMessage(ctx context.Context, tx *ent.Tx, messageIDs []imap.InternalMessageID, mboxID imap.InternalMailboxID) (map[imap.InternalMessageID]*ent.UID, error) {
messageUIDs := make(map[imap.InternalMessageID]imap.UID)

Expand Down Expand Up @@ -558,12 +477,11 @@ func GetMessageIDsMarkedDeleted(ctx context.Context, client *ent.Client) ([]imap
}

func HasMessageWithID(ctx context.Context, client *ent.Client, id imap.InternalMessageID) (bool, error) {
count, err := client.Message.Query().Where(message.ID(id)).Count(ctx)
if err != nil {
return false, err
}
return client.Message.Query().Where(message.ID(id)).Exist(ctx)
}

return count > 0, nil
func HasMessageWithRemoteID(ctx context.Context, client *ent.Client, id imap.MessageID) (bool, error) {
return client.Message.Query().Where(message.RemoteID(id)).Exist(ctx)
}

func GetMessageIDFromRemoteID(ctx context.Context, client *ent.Client, id imap.MessageID) (imap.InternalMessageID, error) {
Expand Down
12 changes: 12 additions & 0 deletions tests/updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,15 @@ func TestMessageAddWithSameID(t *testing.T) {
c.OK("A003")
})
}

func TestBatchMessageAddedWithMultipleFlags(t *testing.T) {
runOneToOneTestWithAuth(t, defaultServerOptions(t), func(c *testConnection, s *testSession) {
mailboxID := s.mailboxCreated("user", []string{"mbox"})
flags := []string{imap.FlagFlagged, imap.FlagDraft, "\\foo", "\\bar", imap.AttrMarked}
s.batchMessageCreated("user", mailboxID, 2, func(i int) ([]byte, []string) {
return []byte("to: 1@1.com"), flags
})

s.flush("user")
})
}

0 comments on commit 812a272

Please sign in to comment.