diff --git a/db.go b/db.go index 79b332d23..fc908dbf5 100644 --- a/db.go +++ b/db.go @@ -136,6 +136,10 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { } else { nv = vp.Encode() meta = meta | bitValuePointer + // Update vhead. If the crash happens while replay was in progess + // and the head is not updated, we will end up replaying all the + // files again. + db.updateHead([]valuePointer{vp}) } v := y.ValueStruct{ @@ -638,6 +642,8 @@ func (db *DB) get(key []byte) (y.ValueStruct, error) { return db.lc.get(key, maxVs, 0) } +// updateHead should not be called without the db.Lock() since db.vhead is used +// by the writer go routines and memtable flushing goroutine. func (db *DB) updateHead(ptrs []valuePointer) { var ptr valuePointer for i := len(ptrs) - 1; i >= 0; i-- { @@ -651,8 +657,6 @@ func (db *DB) updateHead(ptrs []valuePointer) { return } - db.Lock() - defer db.Unlock() y.AssertTrue(!ptr.Less(db.vhead)) db.vhead = ptr } @@ -751,7 +755,9 @@ func (db *DB) writeRequests(reqs []*request) error { done(err) return errors.Wrap(err, "writeRequests") } + db.Lock() db.updateHead(b.Ptrs) + db.Unlock() } done(nil) db.opt.Debugf("%d entries written", count) @@ -940,7 +946,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte { defer b.Close() var vp valuePointer for iter.SeekToFirst(); iter.Valid(); iter.Next() { - if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) { + if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) { continue } vs := iter.Value() @@ -953,9 +959,9 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte { } type flushTask struct { - mt *skl.Skiplist - vptr valuePointer - dropPrefix []byte + mt *skl.Skiplist + vptr valuePointer + dropPrefixes [][]byte } // handleFlushTask must be run serially. @@ -1584,7 +1590,7 @@ func (db *DB) dropAll() (func(), error) { // - Compact L0->L1, skipping over Kp. // - Compact rest of the levels, Li->Li, picking tables which have Kp. // - Resume memtable flushes, compactions and writes. -func (db *DB) DropPrefix(prefix []byte) error { +func (db *DB) DropPrefix(prefixes ...[]byte) error { db.opt.Infof("DropPrefix Called") f, err := db.prepareToDrop() if err != nil { @@ -1604,8 +1610,8 @@ func (db *DB) DropPrefix(prefix []byte) error { task := flushTask{ mt: memtable, // Ensure that the head of value log gets persisted to disk. - vptr: db.vhead, - dropPrefix: prefix, + vptr: db.vhead, + dropPrefixes: prefixes, } db.opt.Debugf("Flushing memtable") if err := db.handleFlushTask(task); err != nil { @@ -1620,7 +1626,7 @@ func (db *DB) DropPrefix(prefix []byte) error { db.mt = skl.NewSkiplist(arenaSize(db.opt)) // Drop prefixes from the levels. - if err := db.lc.dropPrefix(prefix); err != nil { + if err := db.lc.dropPrefixes(prefixes); err != nil { return err } db.opt.Infof("DropPrefix done") diff --git a/go.mod b/go.mod index eae04e485..6cb85b77c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 + github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 diff --git a/go.sum b/go.sum index 4c71dbdf4..a4aa207f9 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU= -github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA= +github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/levels.go b/levels.go index 764fcd944..e52072f76 100644 --- a/levels.go +++ b/levels.go @@ -274,9 +274,25 @@ func (s *levelsController) dropTree() (int, error) { // tables who only have keys with this prefix are quickly dropped. The ones which have other keys // are run through MergeIterator and compacted to create new tables. All the mechanisms of // compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow. -func (s *levelsController) dropPrefix(prefix []byte) error { +func (s *levelsController) dropPrefixes(prefixes [][]byte) error { + // Internal move keys related to the given prefix should also be skipped. + for _, prefix := range prefixes { + key := make([]byte, 0, len(badgerMove)+len(prefix)) + key = append(key, badgerMove...) + key = append(key, prefix...) + prefixes = append(prefixes, key) + } + opt := s.kv.opt - for _, l := range s.levels { + // Iterate levels in the reverse order because if we were to iterate from + // lower level (say level 0) to a higher level (say level 3) we could have + // a state in which level 0 is compacted and an older version of a key exists in lower level. + // At this point, if someone creates an iterator, they would see an old + // value for a key from lower levels. Iterating in reverse order ensures we + // drop the oldest data first so that lookups never return stale data. + for i := len(s.levels) - 1; i >= 0; i-- { + l := s.levels[i] + l.RLock() if l.level == 0 { size := len(l.tables) @@ -288,7 +304,7 @@ func (s *levelsController) dropPrefix(prefix []byte) error { score: 1.74, // A unique number greater than 1.0 does two things. Helps identify this // function in logs, and forces a compaction. - dropPrefix: prefix, + dropPrefixes: prefixes, } if err := s.doCompact(cp); err != nil { opt.Warningf("While compacting level 0: %v", err) @@ -298,39 +314,49 @@ func (s *levelsController) dropPrefix(prefix []byte) error { continue } - var tables []*table.Table - // Internal move keys related to the given prefix should also be skipped. - moveKeyForPrefix := append(badgerMove, prefix...) - prefixesToSkip := [][]byte{prefix, moveKeyForPrefix} - for _, table := range l.tables { - var absent bool - switch { - case hasAnyPrefixes(table.Smallest(), prefixesToSkip): - case hasAnyPrefixes(table.Biggest(), prefixesToSkip): - case containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixesToSkip): - default: - absent = true + // Build a list of compaction tableGroups affecting all the prefixes we + // need to drop. We need to build tableGroups that satisfy the invariant that + // bottom tables are consecutive. + // tableGroup contains groups of consecutive tables. + var tableGroups [][]*table.Table + var tableGroup []*table.Table + + finishGroup := func() { + if len(tableGroup) > 0 { + tableGroups = append(tableGroups, tableGroup) + tableGroup = nil } - if !absent { - tables = append(tables, table) + } + + for _, table := range l.tables { + if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) { + tableGroup = append(tableGroup, table) + } else { + finishGroup() } } + finishGroup() + l.RUnlock() - if len(tables) == 0 { + + if len(tableGroups) == 0 { continue } - cd := compactDef{ - elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"), - thisLevel: l, - nextLevel: l, - top: []*table.Table{}, - bot: tables, - dropPrefix: prefix, - } - if err := s.runCompactDef(l.level, cd); err != nil { - opt.Warningf("While running compact def: %+v. Error: %v", cd, err) - return err + opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups)) + for _, operation := range tableGroups { + cd := compactDef{ + elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"), + thisLevel: l, + nextLevel: l, + top: nil, + bot: operation, + dropPrefixes: prefixes, + } + if err := s.runCompactDef(l.level, cd); err != nil { + opt.Warningf("While running compact def: %+v. Error: %v", cd, err) + return err + } } } return nil @@ -395,9 +421,9 @@ func (l *levelHandler) isCompactable(delSize int64) bool { } type compactionPriority struct { - level int - score float64 - dropPrefix []byte + level int + score float64 + dropPrefixes [][]byte } // pickCompactLevel determines which level to compact. @@ -491,13 +517,19 @@ func (s *levelsController) compactBuildTables( // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap. var valid []*table.Table + +nextTable: for _, table := range botTables { - if len(cd.dropPrefix) > 0 && - bytes.HasPrefix(table.Smallest(), cd.dropPrefix) && - bytes.HasPrefix(table.Biggest(), cd.dropPrefix) { - // All the keys in this table have the dropPrefix. So, this table does not need to be - // in the iterator and can be dropped immediately. - continue + if len(cd.dropPrefixes) > 0 { + for _, prefix := range cd.dropPrefixes { + if bytes.HasPrefix(table.Smallest(), prefix) && + bytes.HasPrefix(table.Biggest(), prefix) { + // All the keys in this table have the dropPrefix. So, this + // table does not need to be in the iterator and can be + // dropped immediately. + continue nextTable + } + } } valid = append(valid, table) } @@ -535,12 +567,9 @@ func (s *levelsController) compactBuildTables( bopts.BfCache = s.kv.bfCache builder := table.NewTableBuilder(bopts) var numKeys, numSkips uint64 - // Internal move keys related to the given prefix should also be skipped. - moveKeyForPrefix := append(badgerMove, cd.dropPrefix...) - prefixesToSkip := [][]byte{cd.dropPrefix, moveKeyForPrefix} for ; it.Valid(); it.Next() { // See if we need to skip the prefix. - if len(cd.dropPrefix) > 0 && hasAnyPrefixes(it.Key(), prefixesToSkip) { + if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) { numSkips++ updateStats(it.Value()) continue @@ -715,10 +744,24 @@ func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool { return false } +func containsPrefix(smallValue, largeValue, prefix []byte) bool { + if bytes.HasPrefix(smallValue, prefix) { + return true + } + if bytes.HasPrefix(largeValue, prefix) { + return true + } + if bytes.Compare(prefix, smallValue) > 0 && + bytes.Compare(prefix, largeValue) < 0 { + return true + } + + return false +} + func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool { for _, prefix := range listOfPrefixes { - if bytes.Compare(prefix, smallValue) > 0 && - bytes.Compare(prefix, largeValue) < 0 { + if containsPrefix(smallValue, largeValue, prefix) { return true } } @@ -740,7 +783,7 @@ type compactDef struct { thisSize int64 - dropPrefix []byte + dropPrefixes [][]byte } func (cd *compactDef) lockLevels() { @@ -902,10 +945,10 @@ func (s *levelsController) doCompact(p compactionPriority) error { y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check. cd := compactDef{ - elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), - thisLevel: s.levels[l], - nextLevel: s.levels[l+1], - dropPrefix: p.dropPrefix, + elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), + thisLevel: s.levels[l], + nextLevel: s.levels[l+1], + dropPrefixes: p.dropPrefixes, } cd.elog.SetMaxEvents(100) defer cd.elog.Finish() diff --git a/levels_test.go b/levels_test.go index 8c7df15bb..853572ded 100644 --- a/levels_test.go +++ b/levels_test.go @@ -564,3 +564,43 @@ func TestL0Stall(t *testing.T) { test(t, &opt) }) } + +// Regression test for https://github.com/dgraph-io/dgraph/issues/5573 +func TestDropPrefixMoveBug(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // l1 is used to verify that drop prefix actually drops move keys from all the levels. + l1 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}} + createAndOpen(db, l1, 1) + + // Mutiple levels can have the exact same move key with version. + l2 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}, {"A", "", 0, 0}} + l21 := []keyValVersion{{"B", "", 0, 0}, {"C", "", 0, 0}} + l22 := []keyValVersion{{"F", "", 0, 0}, {"G", "", 0, 0}} + + // Level 2 has all the tables. + createAndOpen(db, l2, 2) + createAndOpen(db, l21, 2) + createAndOpen(db, l22, 2) + + require.NoError(t, db.lc.validate()) + require.NoError(t, db.DropPrefix([]byte("F"))) + + db.View(func(txn *Txn) error { + iopt := DefaultIteratorOptions + iopt.AllVersions = true + + it := txn.NewIterator(iopt) + defer it.Close() + + specialKey := []byte("F") + droppedPrefixes := [][]byte{specialKey, append(badgerMove, specialKey...)} + for it.Rewind(); it.Valid(); it.Next() { + key := it.Item().Key() + // Ensure we don't have any "F" or "!badger!move!F" left + require.False(t, hasAnyPrefixes(key, droppedPrefixes)) + } + return nil + }) + require.NoError(t, db.lc.validate()) + }) +} diff --git a/table/builder.go b/table/builder.go index d32d9c6aa..adf064d9a 100644 --- a/table/builder.go +++ b/table/builder.go @@ -149,13 +149,13 @@ func (b *Builder) handleBlock() { blockBuf = eBlock } - // The newend should always be less than or equal to the original end - // plus the padding. If the new end is greater than item.end+padding - // that means the data from this block cannot be stored in its existing - // location and trying to copy it over would mean we would over-write - // some data of the next block. - y.AssertTruef(uint32(len(blockBuf)) <= item.end+padding, - "newend: %d item.end: %d padding: %d", len(blockBuf), item.end, padding) + // BlockBuf should always less than or equal to allocated space. If the blockBuf is greater + // than allocated space that means the data from this block cannot be stored in its + // existing location and trying to copy it over would mean we would over-write some data + // of the next block. + allocatedSpace := (item.end - item.start) + padding + 1 + y.AssertTruef(uint32(len(blockBuf)) <= allocatedSpace, "newend: %d oldend: %d padding: %d", + item.start+uint32(len(blockBuf)), item.end, padding) // Acquire the buflock here. The builder.grow function might change // the b.buf while this goroutine was running. diff --git a/util.go b/util.go index b7f173dd3..ccf7939f3 100644 --- a/util.go +++ b/util.go @@ -60,8 +60,8 @@ func (s *levelHandler) validate() error { if y.CompareKeys(s.tables[j].Smallest(), s.tables[j].Biggest()) > 0 { return errors.Errorf( - "Intra: %q vs %q: level=%d j=%d numTables=%d", - s.tables[j].Smallest(), s.tables[j].Biggest(), s.level, j, numTables) + "Intra: \n%s\n vs \n%s\n: level=%d j=%d numTables=%d", + hex.Dump(s.tables[j].Smallest()), hex.Dump(s.tables[j].Biggest()), s.level, j, numTables) } } return nil diff --git a/value_test.go b/value_test.go index 8c3c67a8a..de537837f 100644 --- a/value_test.go +++ b/value_test.go @@ -370,7 +370,6 @@ func TestValueGC4(t *testing.T) { kv, err := Open(opt) require.NoError(t, err) - defer kv.Close() sz := 128 << 10 // 5 entries per value log file. txn := kv.NewTransaction(true) @@ -409,11 +408,9 @@ func TestValueGC4(t *testing.T) { kv.vlog.rewrite(lf0, tr) kv.vlog.rewrite(lf1, tr) - err = kv.vlog.Close() - require.NoError(t, err) + require.NoError(t, kv.Close()) - kv.vlog.init(kv) - err = kv.vlog.open(kv, valuePointer{Fid: 2}, kv.replayFunction()) + kv, err = Open(opt) require.NoError(t, err) for i := 0; i < 8; i++ { @@ -435,6 +432,7 @@ func TestValueGC4(t *testing.T) { return nil })) } + require.NoError(t, kv.Close()) } func TestPersistLFDiscardStats(t *testing.T) { @@ -644,6 +642,11 @@ func TestPartialAppendToValueLog(t *testing.T) { // Replay value log from beginning, badger head is past k2. require.NoError(t, kv.vlog.Close()) + // clean up the current db.vhead so that we can replay from the beginning. + // If we don't clear the current vhead, badger will error out since new + // head passed while opening vlog is zero in the following lines. + kv.vhead = valuePointer{} + kv.vlog.init(kv) require.NoError( t, kv.vlog.open(kv, valuePointer{Fid: 0}, kv.replayFunction()),