Skip to content

Commit

Permalink
fix(GODT-1642): Storage database transactions
Browse files Browse the repository at this point in the history
Add transaction control to the store interface and update existing write
methods that interact with the store to use a transaction.

Since we can't guarantee that the database and the store transactions
commit at the same time, we will commit the store first and then the
database as we can afford to have stale data in the store, but not in
the IMAP state.

The other store implementations have been removed as they don't provide
any transaction mechanism and haven't been used in a while.
  • Loading branch information
LBeernaertProton authored and jameshoulahan committed Sep 2, 2022
1 parent 6073aff commit c296d12
Show file tree
Hide file tree
Showing 24 changed files with 148 additions and 622 deletions.
2 changes: 1 addition & 1 deletion benchmarks/gluon_bench/store_benchmarks/badger_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ func (*BadgerStoreBuilder) New(path string) (store.Store, error) {
}

func init() {
RegisterStoreBuilder("badger", &BadgerStoreBuilder{})
RegisterStoreBuilder("default", &BadgerStoreBuilder{})
}
4 changes: 3 additions & 1 deletion benchmarks/gluon_bench/store_benchmarks/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ func (*Create) Run(ctx context.Context, st store.Store) (*reporter.BenchmarkRun,

for i := uint(0); i < *flags.StoreItemCount; i++ {
dc.Start()
err := s.Set(imap.InternalMessageID(uuid.NewString()), data)
err := store.Tx(st, func(transaction store.Transaction) error {
return transaction.Set(imap.InternalMessageID(uuid.NewString()), data)
})
dc.Stop()

if err != nil {
Expand Down
16 changes: 0 additions & 16 deletions benchmarks/gluon_bench/store_benchmarks/default_store.go

This file was deleted.

4 changes: 3 additions & 1 deletion benchmarks/gluon_bench/store_benchmarks/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func (d *Delete) Run(ctx context.Context, st store.Store) (*reporter.BenchmarkRu
return RunStoreWorkersSplitRange(ctx, st, uint(len(d.uuids)), func(ctx context.Context, s store.Store, dc *timing.Collector, start, end uint) error {
for i := start; i < end; i++ {
dc.Start()
err := s.Delete(d.uuids[i])
err := store.Tx(st, func(transaction store.Transaction) error {
return transaction.Delete(d.uuids[i])
})
dc.Stop()

if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions benchmarks/gluon_bench/store_benchmarks/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ import (
"github.com/google/uuid"
)

func CreateRandomState(store store.Store, count uint) ([]imap.InternalMessageID, error) {
func CreateRandomState(st store.Store, count uint) ([]imap.InternalMessageID, error) {
uuids := make([]imap.InternalMessageID, 0, count)
data := make([]byte, *flags.StoreItemSize)

for i := uint(0); i < count; i++ {
uuid := imap.InternalMessageID(uuid.NewString())

if err := store.Set(uuid, data); err != nil {
if err := store.Tx(st, func(transaction store.Transaction) error {
return transaction.Set(uuid, data)
}); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newBuilder() (*serverBuilder, error) {
return &serverBuilder{
delim: "/",
cmdExecProfBuilder: &profiling.NullCmdExecProfilerBuilder{},
storeBuilder: &store.OnDiskStoreBuilder{},
storeBuilder: &store.BadgerStoreBuilder{},
}, nil
}

Expand Down
41 changes: 21 additions & 20 deletions internal/backend/connector_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backend
import (
"context"
"fmt"
"github.com/ProtonMail/gluon/store"
"strings"

"github.com/ProtonMail/gluon/imap"
Expand Down Expand Up @@ -162,31 +163,31 @@ func (user *user) applyMessagesCreated(ctx context.Context, update *imap.Message

remoteToLocalMessageID := make(map[imap.MessageID]imap.InternalMessageID)

for _, update := range updates {
internalID := uuid.NewString()
return db.WriteAndStore(ctx, user.db, user.store, func(ctx context.Context, tx *ent.Tx, storeTx store.Transaction) error {
for _, update := range updates {
internalID := uuid.NewString()

literal, err := rfc822.SetHeaderValue(update.Literal, ids.InternalIDKey, internalID)
if err != nil {
return fmt.Errorf("failed to set internal ID: %w", err)
}
literal, err := rfc822.SetHeaderValue(update.Literal, ids.InternalIDKey, internalID)
if err != nil {
return fmt.Errorf("failed to set internal ID: %w", err)
}

if err := user.store.Set(imap.InternalMessageID(internalID), literal); err != nil {
return fmt.Errorf("failed to store message literal: %w", err)
}
if err := storeTx.Set(imap.InternalMessageID(internalID), literal); err != nil {
return fmt.Errorf("failed to store message literal: %w", err)
}

reqs = append(reqs, &db.CreateMessageReq{
Message: update.Message,
Literal: literal,
Body: update.Body,
Structure: update.Structure,
Envelope: update.Envelope,
InternalID: imap.InternalMessageID(internalID),
})
reqs = append(reqs, &db.CreateMessageReq{
Message: update.Message,
Literal: literal,
Body: update.Body,
Structure: update.Structure,
Envelope: update.Envelope,
InternalID: imap.InternalMessageID(internalID),
})

remoteToLocalMessageID[update.Message.ID] = imap.InternalMessageID(internalID)
}
remoteToLocalMessageID[update.Message.ID] = imap.InternalMessageID(internalID)
}

return user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
if _, err := db.CreateMessages(ctx, tx, reqs...); err != nil {
return fmt.Errorf("failed to create message: %w", err)
}
Expand Down
20 changes: 12 additions & 8 deletions internal/backend/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (user *user) close(ctx context.Context) error {
}

func (user *user) deleteAllMessagesMarkedDeleted(ctx context.Context) error {
return user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
return db.WriteAndStore(ctx, user.db, user.store, func(ctx context.Context, tx *ent.Tx, stx store.Transaction) error {
ids, err := db.GetMessageIDsMarkedDeleted(ctx, tx.Client())
if err != nil {
return err
Expand All @@ -126,7 +126,7 @@ func (user *user) deleteAllMessagesMarkedDeleted(ctx context.Context) error {
return err
}

return user.store.Delete(ids...)
return stx.Delete(ids...)
})
}

Expand Down Expand Up @@ -199,13 +199,17 @@ func (user *user) removeState(ctx context.Context, st *state.State) error {
// After this point we need to notify the WaitGroup or we risk deadlocks.
defer user.statesWG.Done()

if err := user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
return db.DeleteMessages(ctx, tx, messageIDs...)
}); err != nil {
return err
}
if err := db.WriteAndStore(ctx, user.db, user.store, func(ctx context.Context, tx *ent.Tx, stx store.Transaction) error {
if err := db.DeleteMessages(ctx, tx, messageIDs...); err != nil {
return err
}

if err := user.store.Delete(messageIDs...); err != nil {
if err := stx.Delete(messageIDs...); err != nil {
return err
}

return nil
}); err != nil {
return err
}

Expand Down
21 changes: 21 additions & 0 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package db
import (
"context"
"fmt"
"github.com/ProtonMail/gluon/store"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -103,3 +104,23 @@ func NewDB(dir, userID string) (*DB, error) {

return &DB{db: client}, nil
}

// WriteAndStore is the same as WriteStoreAndResult.
func WriteAndStore(ctx context.Context, db *DB, st store.Store, fn func(context.Context, *ent.Tx, store.Transaction) error) error {
return db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
return store.Tx(st, func(transaction store.Transaction) error {
return fn(ctx, tx, transaction)
})
})
}

// WriteAndStoreResult wraps the two transactions from the SQL and storage databases. The store transaction is wrapped by
// the sql transaction. It is more important to guarantee that the SQL db is consistent, and we accept some unnecessary
// changes in the storage db, we can always recover from these more easily.
func WriteAndStoreResult[T any](ctx context.Context, db *DB, st store.Store, fn func(context.Context, *ent.Tx, store.Transaction) (T, error)) (T, error) {
return WriteResult(ctx, db, func(ctx context.Context, tx *ent.Tx) (T, error) {
return store.TxResult(st, func(transaction store.Transaction) (T, error) {
return fn(ctx, tx, transaction)
})
})
}
4 changes: 3 additions & 1 deletion internal/state/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package state
import (
"context"
"fmt"
"github.com/ProtonMail/gluon/store"
"strings"
"time"

Expand Down Expand Up @@ -81,6 +82,7 @@ func (state *State) actionUpdateMailbox(ctx context.Context, tx *ent.Tx, mboxID
func (state *State) actionCreateMessage(
ctx context.Context,
tx *ent.Tx,
stx store.Transaction,
mboxID ids.MailboxIDPair,
literal []byte,
flags imap.FlagSet,
Expand All @@ -106,7 +108,7 @@ func (state *State) actionCreateMessage(
return 0, fmt.Errorf("failed to set internal ID: %w", err)
}

if err := state.user.GetStore().Set(internalID, literal); err != nil {
if err := stx.Set(internalID, literal); err != nil {
return 0, fmt.Errorf("failed to store message literal: %w", err)
}

Expand Down
5 changes: 3 additions & 2 deletions internal/state/mailbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package state

import (
"context"
"github.com/ProtonMail/gluon/store"
"time"

"github.com/ProtonMail/gluon/imap"
Expand Down Expand Up @@ -146,8 +147,8 @@ func (m *Mailbox) Append(ctx context.Context, literal []byte, flags imap.FlagSet
}
}

return db.WriteResult(ctx, m.state.db(), func(ctx context.Context, tx *ent.Tx) (int, error) {
return m.state.actionCreateMessage(ctx, tx, m.snap.mboxID, literal, flags, date)
return db.WriteAndStoreResult(ctx, m.state.db(), m.state.user.GetStore(), func(ctx context.Context, tx *ent.Tx, transaction store.Transaction) (int, error) {
return m.state.actionCreateMessage(ctx, tx, transaction, m.snap.mboxID, literal, flags, date)
})
}

Expand Down
42 changes: 28 additions & 14 deletions store/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type BadgerStore struct {
wg sync.WaitGroup
}

type badgerTransaction struct {
tx *badger.Txn
}

func NewBadgerStore(path string, userID string, encryptionPassphrase []byte) (*BadgerStore, error) {
db, err := badger.Open(badger.DefaultOptions(filepath.Join(path, userID)).
WithLogger(logrus.StandardLogger()).
Expand Down Expand Up @@ -98,22 +102,32 @@ func (b *BadgerStore) Get(messageID imap.InternalMessageID) ([]byte, error) {
return data, nil
}

func (b *BadgerStore) Set(messageID imap.InternalMessageID, literal []byte) error {
return b.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(messageID), literal)
})
func (b *BadgerStore) NewTransaction() Transaction {
return &badgerTransaction{tx: b.db.NewTransaction(true)}
}

func (b *BadgerStore) Delete(messageID ...imap.InternalMessageID) error {
return b.db.Update(func(txn *badger.Txn) error {
for _, v := range messageID {
if err := txn.Delete([]byte(v)); err != nil {
return err
}
func (b *badgerTransaction) Set(messageID imap.InternalMessageID, literal []byte) error {
return b.tx.Set([]byte(messageID), literal)
}

func (b *badgerTransaction) Delete(messageID ...imap.InternalMessageID) error {
for _, v := range messageID {
if err := b.tx.Delete([]byte(v)); err != nil {
return err
}
}

return nil
})
return nil
}

func (b *badgerTransaction) Commit() error {
return b.tx.Commit()
}

func (b *badgerTransaction) Rollback() error {
b.tx.Discard()

return nil
}

func (b *BadgerStore) Close() error {
Expand All @@ -125,6 +139,6 @@ func (b *BadgerStore) Close() error {

type BadgerStoreBuilder struct{}

func (*BadgerStoreBuilder) New(directory, userID, encryptionPassphrase string) (Store, error) {
return NewBadgerStore(directory, userID, []byte(encryptionPassphrase))
func (*BadgerStoreBuilder) New(directory, userID string, encryptionPassphrase []byte) (Store, error) {
return NewBadgerStore(directory, userID, encryptionPassphrase)
}
6 changes: 0 additions & 6 deletions store/compressor.go

This file was deleted.

43 changes: 0 additions & 43 deletions store/compressor_gzip.go

This file was deleted.

Loading

0 comments on commit c296d12

Please sign in to comment.