Skip to content

Commit

Permalink
Sync active memtable and value log on Db.Sync (#1847) (#1953)
Browse files Browse the repository at this point in the history
Fixes #1847

Currently, `DB.Sync()` only syncs the value log but not the WAL of the
active memtable, however recovery happens from the WAL of the active
memtable as a part of the `DB.Open()` method.

This change attempts to sync both the logs, however there can be an
issue in syncing one of the logs. This change lists all of the possible
cases that can arise during the sync operations. Some of these issues
will be taken separately. For example,
the issue #1954 can arise and
shall be handled separately.

This change adds a few tests to ensure that the Sync behavior is not
broken. This change also adds a learning test `(db2_test.go ->
TestAssertValueLogIsNotWrittenToOnStartup)` to ensure that value log is
only read from (and not written to) during startup. This learning shall
be used in the next issue (the one described above (*))
  • Loading branch information
SarthakMakhija authored Jun 3, 2023
1 parent f9b9e4d commit 4b0d919
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 1 deletion.
33 changes: 32 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,38 @@ const (
// Sync syncs database content to disk. This function provides
// more control to user to sync data whenever required.
func (db *DB) Sync() error {
return db.vlog.sync()
/**
Make an attempt to sync both the logs, the active memtable's WAL and the vLog (1847).
Cases:
- All_ok :: If both the logs sync successfully.
- Entry_Lost :: If an entry with a value pointer was present in the active memtable's WAL,
:: and the WAL was synced but there was an error in syncing the vLog.
:: The entry will be considered lost and this case will need to be handled during recovery.
- Entries_Lost :: If there were errors in syncing both the logs, multiple entries would be lost.
- Entries_Lost :: If the active memtable's WAL is not synced but the vLog is synced, it will
:: result in entries being lost because recovery of the active memtable is done from its WAL.
:: Check `UpdateSkipList` in memtable.go.
- Nothing_lost :: If an entry with its value was present in the active memtable's WAL, and the WAL was synced,
:: but there was an error in syncing the vLog.
:: Nothing is lost for this very specific entry because the entry is completely present in the memtable's WAL.
- Partially_lost :: If entries were written partially in either of the logs,
:: the logs will be truncated during recovery.
:: As a result of truncation, some entries might be lost.
:: Assume that 4KB of data is to be synced and invoking `Sync` results only in syncing 3KB
:: of data and then the machine shuts down or the disk failure happens,
:: this will result in partial writes. [[This case needs verification]]
*/
db.lock.RLock()
memtableSyncError := db.mt.SyncWAL()
db.lock.RUnlock()

vLogSyncError := db.vlog.sync()
return y.CombineErrors(memtableSyncError, vLogSyncError)
}

// getMemtables returns the current memtables and get references.
Expand Down
76 changes: 76 additions & 0 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,3 +1059,79 @@ func TestKeyCount(t *testing.T) {
require.NoError(t, stream.Orchestrate(context.Background()))
require.Equal(t, N, uint64(count))
}

func TestAssertValueLogIsNotWrittenToOnStartup(t *testing.T) {
opt := DefaultOptions("").WithValueLogFileSize(1 << 20).WithValueThreshold(1 << 4)

dir, err := os.MkdirTemp(".", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

openDb := func(readonly bool) *DB {
opts := &opt
opts.Dir = dir
opts.ValueDir = dir
if readonly {
opts.ReadOnly = true
}

if opts.InMemory {
opts.Dir = ""
opts.ValueDir = ""
}
db, err := Open(*opts)
require.NoError(t, err)

return db
}

key := func(i int) string {
return fmt.Sprintf("key%100d", i)
}

assertOnLoadDb := func(db *DB) uint32 {
data := []byte(fmt.Sprintf("value%100d", 1))
for i := 0; i < 20; i++ {
err := db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte(key(i)), data))
})
require.NoError(t, err)
}
return db.vlog.maxFid
}

latestVLogFileSize := func(db *DB, vLogId uint32) uint32 {
return db.vlog.filesMap[vLogId].size.Load()
}

assertOnReadDb := func(db *DB) {
for i := 0; i < 20; i++ {
err := db.View(func(txn *Txn) error {
item, err := txn.Get([]byte(key(i)))
require.NoError(t, err, "Getting key: %s", key(i))
err = item.Value(func(v []byte) error {
_ = v
return nil
})
require.NoError(t, err, "Getting value for the key: %s", key(i))
return nil
})
require.NoError(t, err)
}
}

db := openDb(false)
vLogFileSize := latestVLogFileSize(db, assertOnLoadDb(db))
assertOnReadDb(db)

require.NoError(t, db.Sync())
require.NoError(t, db.Close())

db = openDb(true)
defer func() {
require.NoError(t, db.Close())
}()

assertOnReadDb(db)
require.Equal(t, latestVLogFileSize(db, db.vlog.maxFid), vLogFileSize)
}
61 changes: 61 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2035,6 +2035,67 @@ func TestSyncForRace(t *testing.T) {
<-doneChan
}

func TestSyncForNoErrors(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := Open(DefaultOptions(dir).WithSyncWrites(false))
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()

txn := db.NewTransaction(true)
for i := 0; i < 10; i++ {
require.NoError(
t,
txn.SetEntry(NewEntry(
[]byte(fmt.Sprintf("key%d", i)),
[]byte(fmt.Sprintf("value%d", i)),
)),
)
}
require.NoError(t, txn.Commit())

if err := db.Sync(); err != nil {
require.NoError(t, err)
}
}

func TestSyncForReadingTheEntriesThatWereSynced(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := Open(DefaultOptions(dir).WithSyncWrites(false))
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()

txn := db.NewTransaction(true)
for i := 0; i < 10; i++ {
require.NoError(
t,
txn.SetEntry(NewEntry(
[]byte(fmt.Sprintf("key%d", i)),
[]byte(fmt.Sprintf("value%d", i)),
)),
)
}
require.NoError(t, txn.Commit())

if err := db.Sync(); err != nil {
require.NoError(t, err)
}

readOnlyTxn := db.NewTransaction(false)
for i := 0; i < 10; i++ {
item, err := readOnlyTxn.Get([]byte(fmt.Sprintf("key%d", i)))
require.NoError(t, err)

value := getItemValue(t, item)
require.Equal(t, []byte(fmt.Sprintf("value%d", i)), value)
}
}

func TestForceFlushMemtable(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err, "temp dir for badger could not be created")
Expand Down
13 changes: 13 additions & 0 deletions y/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,16 @@ func Wrapf(err error, format string, args ...interface{}) error {
}
return errors.Wrapf(err, format, args...)
}

func CombineErrors(one, other error) error {
if one != nil && other != nil {
return fmt.Errorf("%v; %v", one, other)
}
if one != nil && other == nil {
return fmt.Errorf("%v", one)
}
if one == nil && other != nil {
return fmt.Errorf("%v", other)
}
return nil
}
28 changes: 28 additions & 0 deletions y/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package y

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
)

func TestCombineWithBothErrorsPresent(t *testing.T) {
combinedError := CombineErrors(errors.New("one"), errors.New("two"))
require.Equal(t, "one; two", combinedError.Error())
}

func TestCombineErrorsWithOneErrorPresent(t *testing.T) {
combinedError := CombineErrors(errors.New("one"), nil)
require.Equal(t, "one", combinedError.Error())
}

func TestCombineErrorsWithOtherErrorPresent(t *testing.T) {
combinedError := CombineErrors(nil, errors.New("other"))
require.Equal(t, "other", combinedError.Error())
}

func TestCombineErrorsWithBothErrorsAsNil(t *testing.T) {
combinedError := CombineErrors(nil, nil)
require.NoError(t, combinedError)
}

0 comments on commit 4b0d919

Please sign in to comment.