Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync active memtable and value log on Db.Sync (#1847) #1953

Merged
merged 5 commits into from
Jun 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,38 @@ const (
// Sync syncs database content to disk. This function provides
// more control to user to sync data whenever required.
func (db *DB) Sync() error {
return db.vlog.sync()
/**
Make an attempt to sync both the logs, the active memtable's WAL and the vLog (1847).
Cases:
- All_ok :: If both the logs sync successfully.

- Entry_Lost :: If an entry with a value pointer was present in the active memtable's WAL,
:: and the WAL was synced but there was an error in syncing the vLog.
:: The entry will be considered lost and this case will need to be handled during recovery.

- Entries_Lost :: If there were errors in syncing both the logs, multiple entries would be lost.

- Entries_Lost :: If the active memtable's WAL is not synced but the vLog is synced, it will
:: result in entries being lost because recovery of the active memtable is done from its WAL.
:: Check `UpdateSkipList` in memtable.go.

- Nothing_lost :: If an entry with its value was present in the active memtable's WAL, and the WAL was synced,
:: but there was an error in syncing the vLog.
:: Nothing is lost for this very specific entry because the entry is completely present in the memtable's WAL.

- Partially_lost :: If entries were written partially in either of the logs,
:: the logs will be truncated during recovery.
:: As a result of truncation, some entries might be lost.
:: Assume that 4KB of data is to be synced and invoking `Sync` results only in syncing 3KB
:: of data and then the machine shuts down or the disk failure happens,
:: this will result in partial writes. [[This case needs verification]]
*/
db.lock.RLock()
memtableSyncError := db.mt.SyncWAL()
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
db.lock.RUnlock()

vLogSyncError := db.vlog.sync()
return y.CombineErrors(memtableSyncError, vLogSyncError)
}

// getMemtables returns the current memtables and get references.
Expand Down
76 changes: 76 additions & 0 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,3 +1059,79 @@ func TestKeyCount(t *testing.T) {
require.NoError(t, stream.Orchestrate(context.Background()))
require.Equal(t, N, uint64(count))
}

func TestAssertValueLogIsNotWrittenToOnStartup(t *testing.T) {
opt := DefaultOptions("").WithValueLogFileSize(1 << 20).WithValueThreshold(1 << 4)

dir, err := os.MkdirTemp(".", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

openDb := func(readonly bool) *DB {
opts := &opt
opts.Dir = dir
opts.ValueDir = dir
if readonly {
opts.ReadOnly = true
}

if opts.InMemory {
opts.Dir = ""
opts.ValueDir = ""
}
db, err := Open(*opts)
require.NoError(t, err)

return db
}

key := func(i int) string {
return fmt.Sprintf("key%100d", i)
}

assertOnLoadDb := func(db *DB) uint32 {
data := []byte(fmt.Sprintf("value%100d", 1))
for i := 0; i < 20; i++ {
err := db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte(key(i)), data))
})
require.NoError(t, err)
}
return db.vlog.maxFid
}

latestVLogFileSize := func(db *DB, vLogId uint32) uint32 {
return db.vlog.filesMap[vLogId].size.Load()
}

assertOnReadDb := func(db *DB) {
for i := 0; i < 20; i++ {
err := db.View(func(txn *Txn) error {
item, err := txn.Get([]byte(key(i)))
require.NoError(t, err, "Getting key: %s", key(i))
err = item.Value(func(v []byte) error {
_ = v
return nil
})
require.NoError(t, err, "Getting value for the key: %s", key(i))
return nil
})
require.NoError(t, err)
}
}

db := openDb(false)
vLogFileSize := latestVLogFileSize(db, assertOnLoadDb(db))
assertOnReadDb(db)

require.NoError(t, db.Sync())
require.NoError(t, db.Close())

db = openDb(true)
defer func() {
require.NoError(t, db.Close())
}()

assertOnReadDb(db)
require.Equal(t, latestVLogFileSize(db, db.vlog.maxFid), vLogFileSize)
}
61 changes: 61 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2035,6 +2035,67 @@ func TestSyncForRace(t *testing.T) {
<-doneChan
}

func TestSyncForNoErrors(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := Open(DefaultOptions(dir).WithSyncWrites(false))
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()

txn := db.NewTransaction(true)
for i := 0; i < 10; i++ {
require.NoError(
t,
txn.SetEntry(NewEntry(
[]byte(fmt.Sprintf("key%d", i)),
[]byte(fmt.Sprintf("value%d", i)),
)),
)
}
require.NoError(t, txn.Commit())

if err := db.Sync(); err != nil {
require.NoError(t, err)
}
}

func TestSyncForReadingTheEntriesThatWereSynced(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := Open(DefaultOptions(dir).WithSyncWrites(false))
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()

txn := db.NewTransaction(true)
for i := 0; i < 10; i++ {
require.NoError(
t,
txn.SetEntry(NewEntry(
[]byte(fmt.Sprintf("key%d", i)),
[]byte(fmt.Sprintf("value%d", i)),
)),
)
}
require.NoError(t, txn.Commit())

if err := db.Sync(); err != nil {
require.NoError(t, err)
}

readOnlyTxn := db.NewTransaction(false)
for i := 0; i < 10; i++ {
item, err := readOnlyTxn.Get([]byte(fmt.Sprintf("key%d", i)))
require.NoError(t, err)

value := getItemValue(t, item)
require.Equal(t, []byte(fmt.Sprintf("value%d", i)), value)
}
}

func TestForceFlushMemtable(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err, "temp dir for badger could not be created")
Expand Down
13 changes: 13 additions & 0 deletions y/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,16 @@ func Wrapf(err error, format string, args ...interface{}) error {
}
return errors.Wrapf(err, format, args...)
}

func CombineErrors(one, other error) error {
if one != nil && other != nil {
return fmt.Errorf("%v; %v", one, other)
}
if one != nil && other == nil {
return fmt.Errorf("%v", one)
}
if one == nil && other != nil {
return fmt.Errorf("%v", other)
}
return nil
}
28 changes: 28 additions & 0 deletions y/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package y

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
)

func TestCombineWithBothErrorsPresent(t *testing.T) {
combinedError := CombineErrors(errors.New("one"), errors.New("two"))
require.Equal(t, "one; two", combinedError.Error())
}

func TestCombineErrorsWithOneErrorPresent(t *testing.T) {
combinedError := CombineErrors(errors.New("one"), nil)
require.Equal(t, "one", combinedError.Error())
}

func TestCombineErrorsWithOtherErrorPresent(t *testing.T) {
combinedError := CombineErrors(nil, errors.New("other"))
require.Equal(t, "other", combinedError.Error())
}

func TestCombineErrorsWithBothErrorsAsNil(t *testing.T) {
combinedError := CombineErrors(nil, nil)
require.NoError(t, combinedError)
}