Skip to content

Commit

Permalink
Handle nil table in compaction and test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ibrahim Jarif committed Aug 14, 2020
1 parent 1c1e17c commit 233be8e
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 62 deletions.
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,7 @@ func (db *DB) Flatten(workers int) error {
errCh := make(chan error, 1)
for i := 0; i < workers; i++ {
go func() {
errCh <- db.lc.doCompact(174, cp)
errCh <- db.lc.doCompact(175, cp)
}()
}
var success int
Expand Down
2 changes: 1 addition & 1 deletion db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true)
require.NoError(t, err)

_, err = fd.Write(b.Finish())
_, err = fd.Write(b.Finish(false))
require.NoError(t, err, "unable to write to file")

tab, err := table.OpenTable(fd, bopts)
Expand Down
12 changes: 8 additions & 4 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"golang.org/x/net/trace"

"github.com/dgraph-io/badger/v2/manual"
"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/table"
"github.com/dgraph-io/badger/v2/y"
Expand Down Expand Up @@ -307,7 +306,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
// function in logs, and forces a compaction.
dropPrefixes: prefixes,
}
if err := s.doCompact(175, cp); err != nil {
if err := s.doCompact(174, cp); err != nil {
opt.Warningf("While compacting level 0: %v", err)
return nil
}
Expand Down Expand Up @@ -694,11 +693,16 @@ nextTable:
}
inflightBuilders.Done(err)

// If we couldn't build the table, return fast.
if err != nil {
return
}

mu.Lock()
newTables = append(newTables, tbl)
num := atomic.LoadInt32(&table.NumBlocks)
allocs := float64(atomic.LoadInt64(&manual.NumAllocs)) / float64((1 << 20))
fmt.Printf("Num Blocks: %d. Num Allocs (MB): %.2f\n", num, allocs)
allocs := float64(atomic.LoadInt64(&y.NumAllocs)) / float64((1 << 20))
s.kv.opt.Logger.Debugf("Num Blocks: %d. Num Allocs (MB): %.2f\n", num, allocs)
mu.Unlock()
}(builder)
}
Expand Down
96 changes: 41 additions & 55 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func createAndOpen(db *DB, td []keyValVersion, level int) {
panic(err)
}

if _, err = fd.Write(b.Finish()); err != nil {
if _, err = fd.Write(b.Finish(false)); err != nil {
panic(err)
}
tab, err := table.OpenTable(fd, opts)
Expand Down Expand Up @@ -740,7 +740,7 @@ func createEmptyTable(db *DB) *table.Table {
b.Add(y.KeyWithTs([]byte("foo"), 1), y.ValueStruct{}, 0)

// Open table in memory to avoid adding changes to manifest file.
tab, err := table.OpenInMemoryTable(b.Finish(), db.lc.reserveFileID(), &opts)
tab, err := table.OpenInMemoryTable(b.Finish(true), db.lc.reserveFileID(), &opts)
if err != nil {
panic(err)
}
Expand All @@ -749,52 +749,6 @@ func createEmptyTable(db *DB) *table.Table {
}

func TestL0Stall(t *testing.T) {
test := func(t *testing.T, opt *Options) {
runBadgerTest(t, opt, func(t *testing.T, db *DB) {
db.lc.levels[0].Lock()
// Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level
// zero and all new additions are expected to stall if L0 is in memory.
for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ {
db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db))
}
db.lc.levels[0].Unlock()

timeout := time.After(5 * time.Second)
done := make(chan bool)

go func() {
tab := createEmptyTable(db)
require.NoError(t, db.lc.addLevel0Table(tab))
tab.DecrRef()
done <- true
}()
// Let it stall for a second.
time.Sleep(time.Second)

select {
case <-timeout:
if opt.KeepL0InMemory {
t.Log("Timeout triggered")
// Mark this test as successful since L0 is in memory and the
// addition of new table to L0 is supposed to stall.

// Remove tables from level 0 so that the stalled
// compaction can make progress. This does not have any
// effect on the test. This is done so that the goroutine
// stuck on addLevel0Table can make progress and end.
db.lc.levels[0].Lock()
db.lc.levels[0].tables = nil
db.lc.levels[0].Unlock()
<-done
} else {
t.Fatal("Test didn't finish in time")
}
case <-done:
// The test completed before 5 second timeout. Mark it as successful.
}
})
}

opt := DefaultOptions("")
// Disable all compactions.
opt.NumCompactors = 0
Expand All @@ -803,13 +757,45 @@ func TestL0Stall(t *testing.T) {
// Addition of new tables will stall if there are 4 or more L0 tables.
opt.NumLevelZeroTablesStall = 4

t.Run("with KeepL0InMemory", func(t *testing.T) {
opt.KeepL0InMemory = true
test(t, &opt)
})
t.Run("with L0 on disk", func(t *testing.T) {
opt.KeepL0InMemory = false
test(t, &opt)
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
db.lc.levels[0].Lock()
// Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level
// zero and all new additions are expected to stall if L0 is in memory.
for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ {
db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db))
}
db.lc.levels[0].Unlock()

timeout := time.After(5 * time.Second)
done := make(chan bool)

go func() {
tab := createEmptyTable(db)
require.NoError(t, db.lc.addLevel0Table(tab))
tab.DecrRef()
done <- true
}()
// Let it stall for a second.
time.Sleep(time.Second)

select {
case <-timeout:
t.Log("Timeout triggered")
// Mark this test as successful since L0 is in memory and the
// addition of new table to L0 is supposed to stall.

// Remove tables from level 0 so that the stalled
// compaction can make progress. This does not have any
// effect on the test. This is done so that the goroutine
// stuck on addLevel0Table can make progress and end.
db.lc.levels[0].Lock()
db.lc.levels[0].tables = nil
db.lc.levels[0].Unlock()
<-done
case <-done:
// The test completed before 5 second timeout. Mark it as successful.
t.Fatal("Test did not stall")
}
})
}

Expand Down
2 changes: 1 addition & 1 deletion manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func buildTable(t *testing.T, keyValues [][]string, bopts table.Options) *os.Fil
UserMeta: 0,
}, 0)
}
_, err = f.Write(b.Finish())
_, err = f.Write(b.Finish(false))
require.NoError(t, err, "unable to write to file.")
f.Close()
f, _ = y.OpenSyncedFile(filename, true)
Expand Down

0 comments on commit 233be8e

Please sign in to comment.