Skip to content

Commit

Permalink
Rework DB.DropPrefix
Browse files Browse the repository at this point in the history
Fixes three issues with the current implementation:

- It can generate compaction requests that break the invariant
  that bottom tables need to be consecutive (issue #1380)
- It performs the same level compactions in increasing order
  of levels (starting from L0) which leads to old versions
  of keys for the prefix re-surfacing to active transactions
- When you have to drop multiple prefixes, the API forces
  you to drop one prefix at a time and go through the whole
  expensive table rewriting multiple times.
  • Loading branch information
damz committed Jun 22, 2020
1 parent d37ce36 commit 4c4867d
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 57 deletions.
16 changes: 8 additions & 8 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
defer b.Close()
var vp valuePointer
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
continue
}
vs := iter.Value()
Expand All @@ -987,9 +987,9 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
}

type flushTask struct {
mt *skl.Skiplist
vptr valuePointer
dropPrefix []byte
mt *skl.Skiplist
vptr valuePointer
dropPrefixes [][]byte
}

// handleFlushTask must be run serially.
Expand Down Expand Up @@ -1618,7 +1618,7 @@ func (db *DB) dropAll() (func(), error) {
// - Compact L0->L1, skipping over Kp.
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
// - Resume memtable flushes, compactions and writes.
func (db *DB) DropPrefix(prefix []byte) error {
func (db *DB) DropPrefix(prefixes ...[]byte) error {
db.opt.Infof("DropPrefix Called")
f, err := db.prepareToDrop()
if err != nil {
Expand All @@ -1638,8 +1638,8 @@ func (db *DB) DropPrefix(prefix []byte) error {
task := flushTask{
mt: memtable,
// Ensure that the head of value log gets persisted to disk.
vptr: db.vhead,
dropPrefix: prefix,
vptr: db.vhead,
dropPrefixes: prefixes,
}
db.opt.Debugf("Flushing memtable")
if err := db.handleFlushTask(task); err != nil {
Expand All @@ -1654,7 +1654,7 @@ func (db *DB) DropPrefix(prefix []byte) error {
db.mt = skl.NewSkiplist(arenaSize(db.opt))

// Drop prefixes from the levels.
if err := db.lc.dropPrefix(prefix); err != nil {
if err := db.lc.dropPrefixes(prefixes); err != nil {
return err
}
db.opt.Infof("DropPrefix done")
Expand Down
133 changes: 84 additions & 49 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,19 @@ func (s *levelsController) dropTree() (int, error) {
// tables who only have keys with this prefix are quickly dropped. The ones which have other keys
// are run through MergeIterator and compacted to create new tables. All the mechanisms of
// compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow.
func (s *levelsController) dropPrefix(prefix []byte) error {
func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
// Internal move keys related to the given prefix should also be skipped.
for _, prefix := range prefixes {
key := make([]byte, 0, len(badgerMove)+len(prefix))
key = append(key, badgerMove...)
key = append(key, prefix...)
prefixes = append(prefixes, key)
}

opt := s.kv.opt
for _, l := range s.levels {
for i := len(s.levels) - 1; i >= 0; i-- {
l := s.levels[i]

l.RLock()
if l.level == 0 {
size := len(l.tables)
Expand All @@ -288,7 +298,7 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
score: 1.74,
// A unique number greater than 1.0 does two things. Helps identify this
// function in logs, and forces a compaction.
dropPrefix: prefix,
dropPrefixes: prefixes,
}
if err := s.doCompact(cp); err != nil {
opt.Warningf("While compacting level 0: %v", err)
Expand All @@ -298,39 +308,48 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
continue
}

var tables []*table.Table
// Internal move keys related to the given prefix should also be skipped.
moveKeyForPrefix := append(badgerMove, prefix...)
prefixesToSkip := [][]byte{prefix, moveKeyForPrefix}
for _, table := range l.tables {
var absent bool
switch {
case hasAnyPrefixes(table.Smallest(), prefixesToSkip):
case hasAnyPrefixes(table.Biggest(), prefixesToSkip):
case containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixesToSkip):
default:
absent = true
// Build a list of compaction operations affecting all the prefixes we
// need to drop. We need to build operations that satisfy the invariant that
// bottom tables are consecutive.
var operations [][]*table.Table
var currentOperation []*table.Table

nextOperation := func() {
if len(currentOperation) > 0 {
operations = append(operations, currentOperation)
currentOperation = nil
}
if !absent {
tables = append(tables, table)
}

for _, table := range l.tables {
if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) {
currentOperation = append(currentOperation, table)
} else {
nextOperation()
}
}
nextOperation()

l.RUnlock()
if len(tables) == 0 {

if len(operations) == 0 {
continue
}

cd := compactDef{
elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"),
thisLevel: l,
nextLevel: l,
top: []*table.Table{},
bot: tables,
dropPrefix: prefix,
}
if err := s.runCompactDef(l.level, cd); err != nil {
opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
return err
opt.Infof("Dropping prefix at level %d (%d operations)", l.level, len(operations))
for _, operation := range operations {
cd := compactDef{
elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"),
thisLevel: l,
nextLevel: l,
top: nil,
bot: operation,
dropPrefixes: prefixes,
}
if err := s.runCompactDef(l.level, cd); err != nil {
opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
return err
}
}
}
return nil
Expand Down Expand Up @@ -395,9 +414,9 @@ func (l *levelHandler) isCompactable(delSize int64) bool {
}

type compactionPriority struct {
level int
score float64
dropPrefix []byte
level int
score float64
dropPrefixes [][]byte
}

// pickCompactLevel determines which level to compact.
Expand Down Expand Up @@ -491,13 +510,18 @@ func (s *levelsController) compactBuildTables(

// Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
var valid []*table.Table

nextTable:
for _, table := range botTables {
if len(cd.dropPrefix) > 0 &&
bytes.HasPrefix(table.Smallest(), cd.dropPrefix) &&
bytes.HasPrefix(table.Biggest(), cd.dropPrefix) {
// All the keys in this table have the dropPrefix. So, this table does not need to be
// in the iterator and can be dropped immediately.
continue
if len(cd.dropPrefixes) > 0 {
for _, prefix := range cd.dropPrefixes {
if bytes.HasPrefix(table.Smallest(), prefix) &&
bytes.HasPrefix(table.Biggest(), prefix) {
// All the keys in this table have the dropPrefix. So, this table does not need to be
// in the iterator and can be dropped immediately.
continue nextTable
}
}
}
valid = append(valid, table)
}
Expand Down Expand Up @@ -535,12 +559,9 @@ func (s *levelsController) compactBuildTables(
bopts.BfCache = s.kv.bfCache
builder := table.NewTableBuilder(bopts)
var numKeys, numSkips uint64
// Internal move keys related to the given prefix should also be skipped.
moveKeyForPrefix := append(badgerMove, cd.dropPrefix...)
prefixesToSkip := [][]byte{cd.dropPrefix, moveKeyForPrefix}
for ; it.Valid(); it.Next() {
// See if we need to skip the prefix.
if len(cd.dropPrefix) > 0 && hasAnyPrefixes(it.Key(), prefixesToSkip) {
if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) {
numSkips++
updateStats(it.Value())
continue
Expand Down Expand Up @@ -719,10 +740,24 @@ func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool {
return false
}

func containsPrefix(smallValue, largeValue []byte, prefix []byte) bool {
if bytes.HasPrefix(smallValue, prefix) {
return true
}
if bytes.HasPrefix(largeValue, prefix) {
return true
}
if bytes.Compare(prefix, smallValue) > 0 &&
bytes.Compare(prefix, largeValue) < 0 {
return true
}

return false
}

func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool {
for _, prefix := range listOfPrefixes {
if bytes.Compare(prefix, smallValue) > 0 &&
bytes.Compare(prefix, largeValue) < 0 {
if containsPrefix(smallValue, largeValue, prefix) {
return true
}
}
Expand All @@ -744,7 +779,7 @@ type compactDef struct {

thisSize int64

dropPrefix []byte
dropPrefixes [][]byte
}

func (cd *compactDef) lockLevels() {
Expand Down Expand Up @@ -918,10 +953,10 @@ func (s *levelsController) doCompact(p compactionPriority) error {
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.

cd := compactDef{
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
thisLevel: s.levels[l],
nextLevel: s.levels[l+1],
dropPrefix: p.dropPrefix,
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
thisLevel: s.levels[l],
nextLevel: s.levels[l+1],
dropPrefixes: p.dropPrefixes,
}
cd.elog.SetMaxEvents(100)
defer cd.elog.Finish()
Expand Down

0 comments on commit 4c4867d

Please sign in to comment.