Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discard key versions during compaction #471

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cmd/badger/cmd/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/table"
"github.com/dgraph-io/badger/y"
humanize "github.com/dustin/go-humanize"
"github.com/spf13/cobra"
)
Expand All @@ -46,6 +47,11 @@ to the Dgraph team.
fmt.Println("Error:", err.Error())
os.Exit(1)
}
err = tableInfo(sstDir, vlogDir)
if err != nil {
fmt.Println("Error:", err.Error())
os.Exit(1)
}
},
}

Expand All @@ -61,6 +67,29 @@ func dur(src, dst time.Time) string {
return humanize.RelTime(dst, src, "earlier", "later")
}

func tableInfo(dir, valueDir string) error {
// Open DB
opts := badger.DefaultOptions
opts.Dir = sstDir
opts.ValueDir = vlogDir
opts.ReadOnly = true

db, err := badger.Open(opts)
if err != nil {
return err
}
defer db.Close()

tables := db.Tables()
for _, t := range tables {
lk, lv := y.ParseKey(t.Left), y.ParseTs(t.Left)
rk, rv := y.ParseKey(t.Right), y.ParseTs(t.Right)
fmt.Printf("SSTable [L%d, %03d] [%20X, v%-10d -> %20X, v%-10d]\n",
t.Level, t.ID, lk, lv, rk, rv)
}
return nil
}

func printInfo(dir, valueDir string) error {
if dir == "" {
return fmt.Errorf("--dir not supplied")
Expand Down
17 changes: 16 additions & 1 deletion cmd/badger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,23 @@

package main

import "github.com/dgraph-io/badger/cmd/badger/cmd"
import (
"fmt"
"net/http"

"github.com/dgraph-io/badger/cmd/badger/cmd"
)

func main() {
go func() {
for i := 8080; i < 9080; i++ {
fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i)
if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil {
fmt.Println("Port busy. Trying another one...")
continue

}
}
}()
cmd.Execute()
}
8 changes: 6 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"log"
"math"
"sync"

"golang.org/x/net/trace"
Expand All @@ -37,7 +38,7 @@ type keyRange struct {
var infRange = keyRange{inf: true}

func (r keyRange) String() string {
return fmt.Sprintf("[left=%q, right=%q, inf=%v]", r.left, r.right, r.inf)
return fmt.Sprintf("[left=%x, right=%x, inf=%v]", r.left, r.right, r.inf)
}

func (r keyRange) equals(dst keyRange) bool {
Expand Down Expand Up @@ -75,7 +76,10 @@ func getKeyRange(tables []*table.Table) keyRange {
biggest = tables[i].Biggest()
}
}
return keyRange{left: smallest, right: biggest}
return keyRange{
left: y.KeyWithTs(y.ParseKey(smallest), math.MaxUint64),
right: y.KeyWithTs(y.ParseKey(biggest), 0),
}
}

type levelCompactStatus struct {
Expand Down
4 changes: 4 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,10 @@ func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) {
return seq, err
}

func (db *DB) Tables() []TableInfo {
return db.lc.getTableInfo()
}

// MergeOperator represents a Badger merge operator.
type MergeOperator struct {
sync.RWMutex
Expand Down
183 changes: 118 additions & 65 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package badger

import (
"fmt"
"math"
"math/rand"
"os"
"sort"
Expand Down Expand Up @@ -262,6 +263,24 @@ func (s *levelsController) compactBuildTables(
topTables := cd.top
botTables := cd.bot

var hasOverlap bool
{
kr := getKeyRange(cd.top)
for i, lh := range s.levels {
if i <= l { // Skip upper levels.
continue
}
lh.RLock()
left, right := lh.overlappingTables(levelHandlerRLocked{}, kr)
lh.RUnlock()
if right-left > 0 {
hasOverlap = true
break
}
}
cd.elog.LazyPrintf("Key range overlaps with lower levels: %v", hasOverlap)
}

// Create iterators across all the tables involved first.
var iters []y.Iterator
if l == 0 {
Expand All @@ -278,54 +297,90 @@ func (s *levelsController) compactBuildTables(

it.Rewind()

readTs := s.kv.orc.readTs()
// Start generating new tables.
type newTableResult struct {
table *table.Table
err error
}
resultCh := make(chan newTableResult)
var i int
for ; it.Valid(); i++ {
var numBuilds int
var lastKey, skipKey []byte
for it.Valid() {
timeStart := time.Now()
builder := table.NewTableBuilder()
var numKeys, numSkips uint64
for ; it.Valid(); it.Next() {
if builder.ReachedCapacity(s.kv.opt.MaxTableSize) {
break
if !y.SameKey(it.Key(), lastKey) {
if builder.ReachedCapacity(s.kv.opt.MaxTableSize) {
// Only break if we are on a different key, and have reached capacity. We want
// to ensure that all versions of the key are stored in the same sstable, and
// not divided across multiple tables at the same level.
break
}
lastKey = y.SafeCopy(lastKey, it.Key())
}
if len(skipKey) > 0 {
if y.SameKey(it.Key(), skipKey) {
numSkips++
continue
} else {
skipKey = skipKey[:0]
}
}

vs := it.Value()
version := y.ParseTs(it.Key())
if version < readTs && isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
// If this version of the key is deleted or expired, skip all the rest of the
// versions. Ensure that we're only removing versions below readTs.
skipKey = y.SafeCopy(skipKey, it.Key())

if !hasOverlap {
// If no overlap, we can skip all the versions, by continuing here.
numSkips++
continue // Skip adding this key.
} else {
// If this key range has overlap with lower levels, then keep the deletion
// marker with the latest version, discarding the rest. This logic here
// would not continue, but has set the skipKey for the future iterations.
}
}
numKeys++
y.Check(builder.Add(it.Key(), it.Value()))
}
// It was true that it.Valid() at least once in the loop above, which means we
// called Add() at least once, and builder is not Empty().
y.AssertTrue(!builder.Empty())

cd.elog.LazyPrintf("LOG Compact. Iteration to generate one table took: %v\n", time.Since(timeStart))

fileID := s.reserveFileID()
go func(builder *table.Builder) {
defer builder.Close()

fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
if err != nil {
resultCh <- newTableResult{nil, errors.Wrapf(err, "While opening new table: %d", fileID)}
return
}
cd.elog.LazyPrintf("Added %d keys. Skipped %d keys.", numKeys, numSkips)
cd.elog.LazyPrintf("LOG Compact. Iteration took: %v\n", time.Since(timeStart))
if !builder.Empty() {
numBuilds++
fileID := s.reserveFileID()
go func(builder *table.Builder) {
defer builder.Close()

fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
if err != nil {
resultCh <- newTableResult{nil, errors.Wrapf(err, "While opening new table: %d", fileID)}
return
}

if _, err := fd.Write(builder.Finish()); err != nil {
resultCh <- newTableResult{nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)}
return
}
if _, err := fd.Write(builder.Finish()); err != nil {
resultCh <- newTableResult{nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)}
return
}

tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode)
// decrRef is added below.
resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())}
}(builder)
tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode)
// decrRef is added below.
resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())}
}(builder)
}
}

newTables := make([]*table.Table, 0, 20)

// Wait for all table builders to finish.
var firstErr error
for x := 0; x < i; x++ {
for x := 0; x < numBuilds; x++ {
res := <-resultCh
newTables = append(newTables, res.table)
if firstErr == nil {
Expand All @@ -343,7 +398,7 @@ func (s *levelsController) compactBuildTables(
if firstErr != nil {
// An error happened. Delete all the newly created table files (by calling DecrRef
// -- we're the only holders of a ref).
for j := 0; j < i; j++ {
for j := 0; j < numBuilds; j++ {
if newTables[j] != nil {
newTables[j].DecrRef()
}
Expand Down Expand Up @@ -446,8 +501,10 @@ func (s *levelsController) fillTables(cd *compactDef) bool {
for _, t := range tbls {
cd.thisSize = t.Size()
cd.thisRange = keyRange{
left: t.Smallest(),
right: t.Biggest(),
// We pick all the versions of the smallest and the biggest key.
left: y.KeyWithTs(y.ParseKey(t.Smallest()), math.MaxUint64),
// Note that version zero would be the rightmost key.
right: y.KeyWithTs(y.ParseKey(t.Biggest()), 0),
}
if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
continue
Expand Down Expand Up @@ -486,40 +543,8 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) {
thisLevel := cd.thisLevel
nextLevel := cd.nextLevel

if thisLevel.level >= 1 && len(cd.bot) == 0 {
y.AssertTrue(len(cd.top) == 1)
tbl := cd.top[0]

// We write to the manifest _before_ we delete files (and after we created files).
changes := []*protos.ManifestChange{
// The order matters here -- you can't temporarily have two copies of the same
// table id when reloading the manifest.
makeTableDeleteChange(tbl.ID()),
makeTableCreateChange(tbl.ID(), nextLevel.level),
}
if err := s.kv.manifest.addChanges(changes); err != nil {
return err
}

// We have to add to nextLevel before we remove from thisLevel, not after. This way, we
// don't have a bug where reads would see keys missing from both levels.

// Note: It's critical that we add tables (replace them) in nextLevel before deleting them
// in thisLevel. (We could finagle it atomically somehow.) Also, when reading we must
// read, or at least acquire s.RLock(), in increasing order by level, so that we don't skip
// a compaction.

if err := nextLevel.replaceTables(cd.top); err != nil {
return err
}
if err := thisLevel.deleteTables(cd.top); err != nil {
return err
}

cd.elog.LazyPrintf("\tLOG Compact-Move %d->%d smallest:%s biggest:%s took %v\n",
l, l+1, string(tbl.Smallest()), string(tbl.Biggest()), time.Since(timeStart))
return nil
}
// Table should never be moved directly between levels, always be rewritten to allow discarding
// invalid versions.

newTables, decr, err := s.compactBuildTables(l, cd)
if err != nil {
Expand Down Expand Up @@ -561,7 +586,7 @@ func (s *levelsController) doCompact(p compactionPriority) (bool, error) {
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.

cd := compactDef{
elog: trace.New("Badger", "Compact"),
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
thisLevel: s.levels[l],
nextLevel: s.levels[l+1],
}
Expand Down Expand Up @@ -704,3 +729,31 @@ func (s *levelsController) appendIterators(
}
return iters
}

type TableInfo struct {
ID uint64
Level int
Left []byte
Right []byte
}

func (s *levelsController) getTableInfo() (result []TableInfo) {
for _, l := range s.levels {
for _, t := range l.tables {
info := TableInfo{
ID: t.ID(),
Level: l.level,
Left: t.Smallest(),
Right: t.Biggest(),
}
result = append(result, info)
}
}
sort.Slice(result, func(i, j int) bool {
if result[i].Level != result[j].Level {
return result[i].Level < result[j].Level
}
return result[i].ID < result[j].ID
})
return
}