Skip to content

Commit

Permalink
fix(GODT-2202): Report update errors to connector
Browse files Browse the repository at this point in the history
When applying updates from the connector report the error, if any,
via the channel the connector is using to wait for the update to complete
its execution.

Extend the dummy connector with an option to allow updates to fail as it
is impossible to avoid these under certain circumstances.
  • Loading branch information
LBeernaertProton committed Jan 13, 2023
1 parent 07a5a07 commit 7dc070e
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 61 deletions.
39 changes: 36 additions & 3 deletions connector/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"bytes"
"context"
"errors"
"fmt"
"github.com/sirupsen/logrus"
"sync"
"sync/atomic"
"time"

"github.com/ProtonMail/gluon/constants"
Expand Down Expand Up @@ -57,6 +60,8 @@ type Dummy struct {
uidValidity imap.UID

allowMessageCreateWithUnknownMailboxID bool

updatesAllowedToFail int32
}

func NewDummy(usernames []string, password []byte, period time.Duration, flags, permFlags, attrs imap.FlagSet) *Dummy {
Expand All @@ -77,7 +82,16 @@ func NewDummy(usernames []string, password []byte, period time.Duration, flags,
go func() {
conn.ticker.Tick(func(time.Time) {
for _, update := range conn.popUpdates() {
defer update.Wait()
defer func() {
err, ok := update.Wait()
if ok && err != nil {
if atomic.LoadInt32(&conn.updatesAllowedToFail) == 0 {
panic(fmt.Sprintf("Failed to apply update %v: %v", update.String(), err))
} else {
logrus.Errorf("Failed to apply update %v: %v", update.String(), err)
}
}
}()

select {
case conn.updateCh <- update:
Expand Down Expand Up @@ -263,9 +277,13 @@ func (conn *Dummy) SetUIDValidity(newUIDValidity imap.UID) error {
func (conn *Dummy) Sync(ctx context.Context) error {
for _, mailbox := range conn.state.getMailboxes() {
update := imap.NewMailboxCreated(mailbox)
defer update.WaitContext(ctx)

conn.updateCh <- update

err, ok := update.WaitContext(ctx)
if ok && err != nil {
return fmt.Errorf("failed to apply update %v:%w", update.String(), err)
}
}

var updates []*imap.MessageCreated
Expand All @@ -280,10 +298,14 @@ func (conn *Dummy) Sync(ctx context.Context) error {
}

update := imap.NewMessagesCreated(conn.allowMessageCreateWithUnknownMailboxID, updates...)
defer update.WaitContext(ctx)

conn.updateCh <- update

err, ok := update.WaitContext(ctx)
if ok && err != nil {
return fmt.Errorf("failed to apply update %v:%w", update.String(), err)
}

return nil
}

Expand Down Expand Up @@ -419,3 +441,14 @@ func (conn *Dummy) validateName(name []string) (bool, error) {

return exclusive, nil
}

func (conn *Dummy) SetUpdatesAllowedToFail(value bool) {
var v int32
if value {
v = 1
} else {
v = 0
}

atomic.StoreInt32(&conn.updatesAllowedToFail, v)
}
2 changes: 1 addition & 1 deletion connector/dummy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestDummyConnector_validateUpdate(t *testing.T) {

go func() {
for update := range conn.GetUpdates() {
update.Done()
update.Done(nil)
}
}()

Expand Down
29 changes: 18 additions & 11 deletions imap/update_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,43 @@ import (

type Waiter interface {
// Wait waits until the update has been marked as done.
Wait()
Wait() (error, bool)

// WaitContext waits until the update has been marked as done or the context is cancelled.
WaitContext(context.Context)
WaitContext(context.Context) (error, bool)

// Done marks the update as done.
Done()
// Done marks the update as done and report an error (if any).
Done(error)
}

type updateWaiter struct {
waitCh chan struct{}
waitCh chan error
}

func newUpdateWaiter() *updateWaiter {
return &updateWaiter{
waitCh: make(chan struct{}),
waitCh: make(chan error, 1),
}
}

func (w *updateWaiter) Wait() {
<-w.waitCh
func (w *updateWaiter) Wait() (error, bool) {
err, ok := <-w.waitCh
return err, ok
}

func (w *updateWaiter) WaitContext(ctx context.Context) {
func (w *updateWaiter) WaitContext(ctx context.Context) (error, bool) {
select {
case <-ctx.Done():
case <-w.waitCh:
return nil, false
case err, ok := <-w.waitCh:
return err, ok
}
}

func (w *updateWaiter) Done() {
func (w *updateWaiter) Done(err error) {
if err != nil {
w.waitCh <- err
}

close(w.waitCh)
}
84 changes: 44 additions & 40 deletions internal/backend/connector_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,50 +19,54 @@ import (

// apply an incoming update originating from the connector.
func (user *user) apply(ctx context.Context, update imap.Update) error {
defer update.Done()

logrus.WithField("update", update).WithField("user-id", user.userID).Debug("Applying update")

switch update := update.(type) {
case *imap.MailboxCreated:
return user.applyMailboxCreated(ctx, update)
err := func() error {
switch update := update.(type) {
case *imap.MailboxCreated:
return user.applyMailboxCreated(ctx, update)

case *imap.MailboxDeleted:
return user.applyMailboxDeleted(ctx, update)
case *imap.MailboxDeleted:
return user.applyMailboxDeleted(ctx, update)

case *imap.MailboxUpdated:
return user.applyMailboxUpdated(ctx, update)
case *imap.MailboxUpdated:
return user.applyMailboxUpdated(ctx, update)

case *imap.MailboxIDChanged:
return user.applyMailboxIDChanged(ctx, update)
case *imap.MailboxIDChanged:
return user.applyMailboxIDChanged(ctx, update)

case *imap.MessagesCreated:
return user.applyMessagesCreated(ctx, update)
case *imap.MessagesCreated:
return user.applyMessagesCreated(ctx, update)

case *imap.MessageMailboxesUpdated:
return user.applyMessageMailboxesUpdated(ctx, update)
case *imap.MessageMailboxesUpdated:
return user.applyMessageMailboxesUpdated(ctx, update)

case *imap.MessageFlagsUpdated:
return user.applyMessageFlagsUpdated(ctx, update)
case *imap.MessageFlagsUpdated:
return user.applyMessageFlagsUpdated(ctx, update)

case *imap.MessageIDChanged:
return user.applyMessageIDChanged(ctx, update)
case *imap.MessageIDChanged:
return user.applyMessageIDChanged(ctx, update)

case *imap.MessageDeleted:
return user.applyMessageDeleted(ctx, update)
case *imap.MessageDeleted:
return user.applyMessageDeleted(ctx, update)

case *imap.MessageUpdated:
return user.applyMessageUpdated(ctx, update)
case *imap.MessageUpdated:
return user.applyMessageUpdated(ctx, update)

case *imap.UIDValidityBumped:
return user.applyUIDValidityBumped(ctx, update)
case *imap.UIDValidityBumped:
return user.applyUIDValidityBumped(ctx, update)

case *imap.Noop:
return nil
case *imap.Noop:
return nil

default:
return fmt.Errorf("bad update")
}
default:
return fmt.Errorf("bad update")
}
}()

update.Done(err)

return err
}

// applyMailboxCreated applies a MailboxCreated update.
Expand All @@ -71,25 +75,25 @@ func (user *user) applyMailboxCreated(ctx context.Context, update *imap.MailboxC
return fmt.Errorf("attempting to create protected mailbox (recovery)")
}

if err := user.imapLimits.CheckUIDValidity(user.globalUIDValidity); err != nil {
return err
}

if exists, err := db.ReadResult(ctx, user.db, func(ctx context.Context, client *ent.Client) (bool, error) {
if mailboxCount, err := db.GetMailboxCount(ctx, client); err != nil {
return false, err
} else if err := user.imapLimits.CheckMailBoxCount(mailboxCount); err != nil {
return false, err
}

return db.MailboxExistsWithRemoteID(ctx, client, update.Mailbox.ID)
}); err != nil {
return err
} else if exists {
return nil
}

if err := user.imapLimits.CheckUIDValidity(user.globalUIDValidity); err != nil {
return err
}

return user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
if mailboxCount, err := db.GetMailboxCount(ctx, tx.Client()); err != nil {
return err
} else if err := user.imapLimits.CheckMailBoxCount(mailboxCount); err != nil {
return err
}

if _, err := db.CreateMailbox(
ctx,
tx,
Expand Down
10 changes: 6 additions & 4 deletions internal/backend/update_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func (u *updateInjector) forward(ctx context.Context, updateCh <-chan imap.Updat

for {
select {
case <-ctx.Done():
return
case update, ok := <-updateCh:
if !ok {
return
Expand All @@ -76,14 +78,14 @@ func (u *updateInjector) forward(ctx context.Context, updateCh <-chan imap.Updat
}

// send the update on the updates channel, optionally blocking until it has been processed.
func (u *updateInjector) send(ctx context.Context, update imap.Update, withBlock ...bool) {
func (u *updateInjector) send(ctx context.Context, update imap.Update) {
select {
case <-u.forwardQuitCh:
return

case u.updatesCh <- update:
if len(withBlock) > 0 && withBlock[0] {
update.WaitContext(ctx)
}

case <-ctx.Done():
return
}
}
5 changes: 5 additions & 0 deletions tests/imap_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestMaxUIDLimitRespected_Append(t *testing.T) {

func TestMaxMessageLimitRespected_Copy(t *testing.T) {
runOneToOneTestClientWithAuth(t, defaultServerOptions(t, withIMAPLimits(testIMAPLimits())), func(client *client.Client, session *testSession) {
session.setUpdatesAllowedToFail("user", true)
require.NoError(t, client.Create("mbox1"))
require.NoError(t, doAppendWithClient(client, "mbox1", "To: Foo@bar.com", time.Now()))
require.NoError(t, doAppendWithClient(client, "INBOX", "To: Bar@bar.com", time.Now()))
Expand All @@ -57,6 +58,7 @@ func TestMaxMessageLimitRespected_Copy(t *testing.T) {

func TestMaxUIDLimitRespected_Copy(t *testing.T) {
runOneToOneTestClientWithAuth(t, defaultServerOptions(t, withIMAPLimits(testIMAPLimits())), func(client *client.Client, session *testSession) {
session.setUpdatesAllowedToFail("user", true)
require.NoError(t, client.Create("mbox1"))
require.NoError(t, doAppendWithClient(client, "mbox1", "To: Foo@bar.com", time.Now()))
require.NoError(t, doAppendWithClient(client, "INBOX", "To: Bar@bar.com", time.Now()))
Expand All @@ -76,6 +78,7 @@ func TestMaxUIDLimitRespected_Copy(t *testing.T) {

func TestMaxMessageLimitRespected_Move(t *testing.T) {
runOneToOneTestClientWithAuth(t, defaultServerOptions(t, withIMAPLimits(testIMAPLimits())), func(client *client.Client, session *testSession) {
session.setUpdatesAllowedToFail("user", true)
require.NoError(t, client.Create("mbox1"))
require.NoError(t, doAppendWithClient(client, "mbox1", "To: Foo@bar.com", time.Now()))
require.NoError(t, doAppendWithClient(client, "INBOX", "To: Bar@bar.com", time.Now()))
Expand All @@ -87,6 +90,7 @@ func TestMaxMessageLimitRespected_Move(t *testing.T) {

func TestMaxUIDLimitRespected_Move(t *testing.T) {
runOneToOneTestClientWithAuth(t, defaultServerOptions(t, withIMAPLimits(testIMAPLimits())), func(client *client.Client, session *testSession) {
session.setUpdatesAllowedToFail("user", true)
require.NoError(t, client.Create("mbox1"))
require.NoError(t, doAppendWithClient(client, "mbox1", "To: Foo@bar.com", time.Now()))
require.NoError(t, doAppendWithClient(client, "INBOX", "To: Bar@bar.com", time.Now()))
Expand All @@ -106,6 +110,7 @@ func TestMaxUIDLimitRespected_Move(t *testing.T) {

func TestMaxUIDValidityLimitRespected(t *testing.T) {
runOneToOneTestClientWithAuth(t, defaultServerOptions(t, withIMAPLimits(testIMAPLimits())), func(client *client.Client, session *testSession) {
session.setUpdatesAllowedToFail("user", true)
require.NoError(t, client.Create("mbox1"))
require.NoError(t, client.Delete("mbox1"))
require.Error(t, client.Create("mbox2"))
Expand Down
7 changes: 5 additions & 2 deletions tests/recent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,12 @@ func TestRecentAppend(t *testing.T) {
}

func TestRecentStore(t *testing.T) {
runManyToOneTestWithAuth(t, defaultServerOptions(t), []int{1, 2}, func(c map[int]*testConnection, _ *testSession) {
runManyToOneTestWithAuth(t, defaultServerOptions(t), []int{1, 2}, func(c map[int]*testConnection, s *testSession) {
mbox, done := c[1].doCreateTempDir()
defer done()
defer func() {
s.flush("user")
done()
}()

// Create a message in mbox.
c[1].doAppend(mbox, `To: 1@pm.me`).expect(`OK`)
Expand Down
6 changes: 6 additions & 0 deletions tests/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Connector interface {
Sync(context.Context) error

Flush()

SetUpdatesAllowedToFail(bool)
}

type testSession struct {
Expand Down Expand Up @@ -361,6 +363,10 @@ func (s *testSession) flush(user string) {
s.conns[s.userIDs[user]].Flush()
}

func (s *testSession) setUpdatesAllowedToFail(user string, value bool) {
s.conns[s.userIDs[user]].SetUpdatesAllowedToFail(value)
}

func forMessageInMBox(rr io.Reader, fn func(messageDelimiter, literal []byte)) error {
mr := mbox.NewReader(rr)

Expand Down
1 change: 1 addition & 0 deletions tests/updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func TestBatchMessageAddedWithMultipleFlags(t *testing.T) {
func TestMessageCreatedWithIgnoreMissingMailbox(t *testing.T) {
runOneToOneTestClientWithAuth(t, defaultServerOptions(t), func(c *client.Client, s *testSession) {
mailboxID := s.mailboxCreated("user", []string{"mbox"})
s.setUpdatesAllowedToFail("user", true)
{
// First round fails as a missing mailbox is not allowed.
s.messageCreatedWithMailboxes("user", []imap.MailboxID{mailboxID, "THIS MAILBOX DOES NOT EXISTS"}, []byte("To: Test"), time.Now())
Expand Down

0 comments on commit 7dc070e

Please sign in to comment.