Skip to content

Commit

Permalink
Make value log garbage collection more intelligent.
Browse files Browse the repository at this point in the history
We now keep track of the number of deletions happening in every
log file, and use that to pick a good candidate for garbage collection.
  • Loading branch information
deepakjois committed Oct 25, 2017
1 parent 3df5030 commit 5240a8f
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 5 deletions.
5 changes: 5 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@ func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error {
Key: y.KeyWithTs(key, item.version),
Meta: bitDelete,
})
db.vlog.updateGCStats(item)
}
return db.batchSet(entries)
}
Expand Down Expand Up @@ -947,6 +948,7 @@ func (db *DB) PurgeOlderVersions() error {
Key: y.KeyWithTs(lastKey, item.version),
Meta: bitDelete,
})
db.vlog.updateGCStats(item)
count++

// Batch up 1000 entries at a time and write
Expand Down Expand Up @@ -1002,6 +1004,7 @@ func (db *DB) RunValueLogGC(discardRatio float64) error {
return ErrInvalidRequest
}

// Find head on disk
headKey := y.KeyWithTs(head, math.MaxUint64)
// Need to pass with timestamp, lsm get removes the last 8 bytes and compares key
val, err := db.lc.get(headKey)
Expand All @@ -1013,5 +1016,7 @@ func (db *DB) RunValueLogGC(discardRatio float64) error {
if len(val.Value) > 0 {
head.Decode(val.Value)
}

// Pick a log file and run GC
return db.vlog.runGC(discardRatio, head)
}
3 changes: 2 additions & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ func TestPurgeVersionsBelow(t *testing.T) {
// Write 4 versions of the same key
for i := 0; i < 4; i++ {
err = db.Update(func(txn *Txn) error {
return txn.Set([]byte("answer"), []byte(fmt.Sprintf("%d", i)), 0)
return txn.Set([]byte("answer"), []byte(fmt.Sprintf("%25d", i)), 0)
})
require.NoError(t, err)
}
Expand Down Expand Up @@ -887,6 +887,7 @@ func TestPurgeVersionsBelow(t *testing.T) {
// Delete all versions below the 3rd version
err = db.PurgeVersionsBelow([]byte("answer"), ts)
require.NoError(t, err)
require.NotEmpty(t, db.vlog.lfDiscardStats.m)

// Verify that there are only 2 versions left
db.View(func(txn *Txn) error {
Expand Down
59 changes: 55 additions & 4 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"io"
"io/ioutil"
"log"
"math"
"math/rand"
"os"
"sort"
Expand Down Expand Up @@ -427,6 +428,13 @@ func (vlog *valueLog) deleteLogFile(lf *logFile) error {
return os.Remove(path)
}

// lfDiscardStats keeps track of the amount of data that could be discarded for
// a given logfile.
type lfDiscardStats struct {
sync.Mutex
m map[uint32]int64
}

type valueLog struct {
buf bytes.Buffer
dirPath string
Expand All @@ -444,7 +452,8 @@ type valueLog struct {
writableLogOffset uint32
opt Options

garbageCh chan struct{}
garbageCh chan struct{}
lfDiscardStats *lfDiscardStats
}

func vlogFilePath(dirPath string, fid uint32) string {
Expand Down Expand Up @@ -543,7 +552,7 @@ func (vlog *valueLog) Open(kv *DB, opt Options) error {

vlog.elog = trace.NewEventLog("Badger", "Valuelog")
vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.

vlog.lfDiscardStats = &lfDiscardStats{m: make(map[uint32]int64)}
return nil
}

Expand Down Expand Up @@ -815,6 +824,26 @@ func (vlog *valueLog) pickLog(head valuePointer) *logFile {
return nil
}

// Pick a candidate that contains the largest amount of discardable data
candidate := struct {
fid uint32
discard int64
}{math.MaxUint32, 0}
vlog.lfDiscardStats.Lock()
for j := 0; j < i; j++ {
fid := fids[j]
if vlog.lfDiscardStats.m[fid] > candidate.discard {
candidate.fid = fids[j]
candidate.discard = vlog.lfDiscardStats.m[fid]
}
}
vlog.lfDiscardStats.Unlock()

if candidate.fid != math.MaxUint32 { // Found a candidate
return vlog.filesMap[candidate.fid]
}

// Fallback to randomly picking a log file
idx := rand.Intn(i) // Don’t include head.Fid. We pick a random file before it.
if idx > 0 {
idx = rand.Intn(idx + 1) // Another level of rand to favor smaller fids.
Expand Down Expand Up @@ -842,12 +871,22 @@ func discardEntry(e entry, vs y.ValueStruct) bool {
return false
}

func (vlog *valueLog) doRunGC(gcThreshold float64, head valuePointer) error {
func (vlog *valueLog) doRunGC(gcThreshold float64, head valuePointer) (err error) {
// Pick a log file for GC
lf := vlog.pickLog(head)
if lf == nil {
return ErrNoRewrite
}

// Update stats before exiting
defer func() {
if err == nil {
vlog.lfDiscardStats.Lock()
delete(vlog.lfDiscardStats.m, lf.fid)
vlog.lfDiscardStats.Unlock()
}
}()

type reason struct {
total float64
keep float64
Expand All @@ -864,7 +903,7 @@ func (vlog *valueLog) doRunGC(gcThreshold float64, head valuePointer) error {

start := time.Now()
y.AssertTrue(vlog.kv != nil)
err := vlog.iterate(lf, 0, func(e entry, vp valuePointer) error {
err = vlog.iterate(lf, 0, func(e entry, vp valuePointer) error {
esz := float64(vp.Len) / (1 << 20) // in MBs. +4 for the CAS stuff.
skipped += esz
if skipped < skipFirstM {
Expand Down Expand Up @@ -960,10 +999,22 @@ func (vlog *valueLog) waitOnGC(lc *y.Closer) {
func (vlog *valueLog) runGC(gcThreshold float64, head valuePointer) error {
select {
case vlog.garbageCh <- struct{}{}:

// Run GC
err := vlog.doRunGC(gcThreshold, head)
<-vlog.garbageCh
return err
default:
return ErrRejected
}
}

func (vlog *valueLog) updateGCStats(item *Item) {
if item.meta&bitValuePointer > 0 {
var vp valuePointer
vp.Decode(item.vptr)
vlog.lfDiscardStats.Lock()
vlog.lfDiscardStats.m[vp.Fid] += int64(vp.Len)
vlog.lfDiscardStats.Unlock()
}
}

0 comments on commit 5240a8f

Please sign in to comment.