Skip to content

Commit

Permalink
Update dgraph-maintenance branch
Browse files Browse the repository at this point in the history
This PR adds the following changes from **Master branch to the dgraph-maintenance branch**.
```
25fd0ef Update ristretto to commit f66de99 (#1391)
fac972f Update head while replaying value log (#1372)
e5fd05a Rework DB.DropPrefix (#1381)
adeb842 Fix assert in background compression and encryption. (#1366)
```

The **master and dgraph-maintenace branch** have following difference 
`+`  (in green) means commit missing on dgraph-maintenance
 `-` (in red) means the commit exists.
```diff
+ 079f5ae DefaultOptions: Set KeepL0InMemory to false (#1345)
+ 7e19cac Add immudb to the project list (#1341)
+ da80eb9 Iterator: Always add key to txn.reads (#1328)
+ a7e239e StreamWriter: Close head writer (#1347)
+ 543f353 Fix build on golang tip (#1355)
+ fd89894 Compaction: Expired keys and delete markers are never purged (#1354)
+ 056d859 Support disabling conflict detection (#1344)
+ b762832 Tests: Do not leave behind state goroutines (#1349)
+ b2267c2 Restore: Account for value size as well (#1358)
+ 14386ac GC: Consider size of value while rewriting (#1357)
+ c45d966 Fix assert in background compression and encryption. (#1366)
+ dd332b0 Avoid panic in filltables() (#1365)
+ 3f4761d Force KeepL0InMemory to be true when InMemory is true (#1375)
+ d37ce36 Tests: Use t.Parallel in TestIteratePrefix tests  (#1377)
+ 158d927 Remove second initialization of writech in Open (#1382)
+ 675efcd Increase default valueThreshold from 32B to 1KB (#1346)
+ 3042e37 pre allocate cache key for the block cache and the bloom filter cache (#1371)
+ e013bfd Rework DB.DropPrefix (#1381)
+ 509de73 Update head while replaying value log (#1372)
+ 09dfa66 Update ristretto to commit f66de99 (#1391)
+ 717b89c Enable cross-compiled 32bit tests on TravisCI (#1392)
```

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/dgraph-io/badger/1398)
<!-- Reviewable:end -->
  • Loading branch information
Ibrahim Jarif authored Jul 8, 2020
2 parents e7b6e76 + 25fd0ef commit 3b7d067
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 76 deletions.
26 changes: 16 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
} else {
nv = vp.Encode()
meta = meta | bitValuePointer
// Update vhead. If the crash happens while replay was in progess
// and the head is not updated, we will end up replaying all the
// files again.
db.updateHead([]valuePointer{vp})
}

v := y.ValueStruct{
Expand Down Expand Up @@ -638,6 +642,8 @@ func (db *DB) get(key []byte) (y.ValueStruct, error) {
return db.lc.get(key, maxVs, 0)
}

// updateHead should not be called without the db.Lock() since db.vhead is used
// by the writer go routines and memtable flushing goroutine.
func (db *DB) updateHead(ptrs []valuePointer) {
var ptr valuePointer
for i := len(ptrs) - 1; i >= 0; i-- {
Expand All @@ -651,8 +657,6 @@ func (db *DB) updateHead(ptrs []valuePointer) {
return
}

db.Lock()
defer db.Unlock()
y.AssertTrue(!ptr.Less(db.vhead))
db.vhead = ptr
}
Expand Down Expand Up @@ -751,7 +755,9 @@ func (db *DB) writeRequests(reqs []*request) error {
done(err)
return errors.Wrap(err, "writeRequests")
}
db.Lock()
db.updateHead(b.Ptrs)
db.Unlock()
}
done(nil)
db.opt.Debugf("%d entries written", count)
Expand Down Expand Up @@ -940,7 +946,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
defer b.Close()
var vp valuePointer
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
continue
}
vs := iter.Value()
Expand All @@ -953,9 +959,9 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
}

type flushTask struct {
mt *skl.Skiplist
vptr valuePointer
dropPrefix []byte
mt *skl.Skiplist
vptr valuePointer
dropPrefixes [][]byte
}

// handleFlushTask must be run serially.
Expand Down Expand Up @@ -1584,7 +1590,7 @@ func (db *DB) dropAll() (func(), error) {
// - Compact L0->L1, skipping over Kp.
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
// - Resume memtable flushes, compactions and writes.
func (db *DB) DropPrefix(prefix []byte) error {
func (db *DB) DropPrefix(prefixes ...[]byte) error {
db.opt.Infof("DropPrefix Called")
f, err := db.prepareToDrop()
if err != nil {
Expand All @@ -1604,8 +1610,8 @@ func (db *DB) DropPrefix(prefix []byte) error {
task := flushTask{
mt: memtable,
// Ensure that the head of value log gets persisted to disk.
vptr: db.vhead,
dropPrefix: prefix,
vptr: db.vhead,
dropPrefixes: prefixes,
}
db.opt.Debugf("Flushing memtable")
if err := db.handleFlushTask(task); err != nil {
Expand All @@ -1620,7 +1626,7 @@ func (db *DB) DropPrefix(prefix []byte) error {
db.mt = skl.NewSkiplist(arenaSize(db.opt))

// Drop prefixes from the levels.
if err := db.lc.dropPrefix(prefix); err != nil {
if err := db.lc.dropPrefixes(prefixes); err != nil {
return err
}
db.opt.Infof("DropPrefix done")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
141 changes: 92 additions & 49 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,25 @@ func (s *levelsController) dropTree() (int, error) {
// tables who only have keys with this prefix are quickly dropped. The ones which have other keys
// are run through MergeIterator and compacted to create new tables. All the mechanisms of
// compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow.
func (s *levelsController) dropPrefix(prefix []byte) error {
func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
// Internal move keys related to the given prefix should also be skipped.
for _, prefix := range prefixes {
key := make([]byte, 0, len(badgerMove)+len(prefix))
key = append(key, badgerMove...)
key = append(key, prefix...)
prefixes = append(prefixes, key)
}

opt := s.kv.opt
for _, l := range s.levels {
// Iterate levels in the reverse order because if we were to iterate from
// lower level (say level 0) to a higher level (say level 3) we could have
// a state in which level 0 is compacted and an older version of a key exists in lower level.
// At this point, if someone creates an iterator, they would see an old
// value for a key from lower levels. Iterating in reverse order ensures we
// drop the oldest data first so that lookups never return stale data.
for i := len(s.levels) - 1; i >= 0; i-- {
l := s.levels[i]

l.RLock()
if l.level == 0 {
size := len(l.tables)
Expand All @@ -288,7 +304,7 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
score: 1.74,
// A unique number greater than 1.0 does two things. Helps identify this
// function in logs, and forces a compaction.
dropPrefix: prefix,
dropPrefixes: prefixes,
}
if err := s.doCompact(cp); err != nil {
opt.Warningf("While compacting level 0: %v", err)
Expand All @@ -298,39 +314,49 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
continue
}

var tables []*table.Table
// Internal move keys related to the given prefix should also be skipped.
moveKeyForPrefix := append(badgerMove, prefix...)
prefixesToSkip := [][]byte{prefix, moveKeyForPrefix}
for _, table := range l.tables {
var absent bool
switch {
case hasAnyPrefixes(table.Smallest(), prefixesToSkip):
case hasAnyPrefixes(table.Biggest(), prefixesToSkip):
case containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixesToSkip):
default:
absent = true
// Build a list of compaction tableGroups affecting all the prefixes we
// need to drop. We need to build tableGroups that satisfy the invariant that
// bottom tables are consecutive.
// tableGroup contains groups of consecutive tables.
var tableGroups [][]*table.Table
var tableGroup []*table.Table

finishGroup := func() {
if len(tableGroup) > 0 {
tableGroups = append(tableGroups, tableGroup)
tableGroup = nil
}
if !absent {
tables = append(tables, table)
}

for _, table := range l.tables {
if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) {
tableGroup = append(tableGroup, table)
} else {
finishGroup()
}
}
finishGroup()

l.RUnlock()
if len(tables) == 0 {

if len(tableGroups) == 0 {
continue
}

cd := compactDef{
elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"),
thisLevel: l,
nextLevel: l,
top: []*table.Table{},
bot: tables,
dropPrefix: prefix,
}
if err := s.runCompactDef(l.level, cd); err != nil {
opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
return err
opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups))
for _, operation := range tableGroups {
cd := compactDef{
elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"),
thisLevel: l,
nextLevel: l,
top: nil,
bot: operation,
dropPrefixes: prefixes,
}
if err := s.runCompactDef(l.level, cd); err != nil {
opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
return err
}
}
}
return nil
Expand Down Expand Up @@ -395,9 +421,9 @@ func (l *levelHandler) isCompactable(delSize int64) bool {
}

type compactionPriority struct {
level int
score float64
dropPrefix []byte
level int
score float64
dropPrefixes [][]byte
}

// pickCompactLevel determines which level to compact.
Expand Down Expand Up @@ -491,13 +517,19 @@ func (s *levelsController) compactBuildTables(

// Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
var valid []*table.Table

nextTable:
for _, table := range botTables {
if len(cd.dropPrefix) > 0 &&
bytes.HasPrefix(table.Smallest(), cd.dropPrefix) &&
bytes.HasPrefix(table.Biggest(), cd.dropPrefix) {
// All the keys in this table have the dropPrefix. So, this table does not need to be
// in the iterator and can be dropped immediately.
continue
if len(cd.dropPrefixes) > 0 {
for _, prefix := range cd.dropPrefixes {
if bytes.HasPrefix(table.Smallest(), prefix) &&
bytes.HasPrefix(table.Biggest(), prefix) {
// All the keys in this table have the dropPrefix. So, this
// table does not need to be in the iterator and can be
// dropped immediately.
continue nextTable
}
}
}
valid = append(valid, table)
}
Expand Down Expand Up @@ -535,12 +567,9 @@ func (s *levelsController) compactBuildTables(
bopts.BfCache = s.kv.bfCache
builder := table.NewTableBuilder(bopts)
var numKeys, numSkips uint64
// Internal move keys related to the given prefix should also be skipped.
moveKeyForPrefix := append(badgerMove, cd.dropPrefix...)
prefixesToSkip := [][]byte{cd.dropPrefix, moveKeyForPrefix}
for ; it.Valid(); it.Next() {
// See if we need to skip the prefix.
if len(cd.dropPrefix) > 0 && hasAnyPrefixes(it.Key(), prefixesToSkip) {
if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) {
numSkips++
updateStats(it.Value())
continue
Expand Down Expand Up @@ -715,10 +744,24 @@ func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool {
return false
}

func containsPrefix(smallValue, largeValue, prefix []byte) bool {
if bytes.HasPrefix(smallValue, prefix) {
return true
}
if bytes.HasPrefix(largeValue, prefix) {
return true
}
if bytes.Compare(prefix, smallValue) > 0 &&
bytes.Compare(prefix, largeValue) < 0 {
return true
}

return false
}

func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool {
for _, prefix := range listOfPrefixes {
if bytes.Compare(prefix, smallValue) > 0 &&
bytes.Compare(prefix, largeValue) < 0 {
if containsPrefix(smallValue, largeValue, prefix) {
return true
}
}
Expand All @@ -740,7 +783,7 @@ type compactDef struct {

thisSize int64

dropPrefix []byte
dropPrefixes [][]byte
}

func (cd *compactDef) lockLevels() {
Expand Down Expand Up @@ -902,10 +945,10 @@ func (s *levelsController) doCompact(p compactionPriority) error {
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.

cd := compactDef{
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
thisLevel: s.levels[l],
nextLevel: s.levels[l+1],
dropPrefix: p.dropPrefix,
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
thisLevel: s.levels[l],
nextLevel: s.levels[l+1],
dropPrefixes: p.dropPrefixes,
}
cd.elog.SetMaxEvents(100)
defer cd.elog.Finish()
Expand Down
40 changes: 40 additions & 0 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,3 +564,43 @@ func TestL0Stall(t *testing.T) {
test(t, &opt)
})
}

// Regression test for https://github.com/dgraph-io/dgraph/issues/5573
func TestDropPrefixMoveBug(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
// l1 is used to verify that drop prefix actually drops move keys from all the levels.
l1 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}}
createAndOpen(db, l1, 1)

// Mutiple levels can have the exact same move key with version.
l2 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}, {"A", "", 0, 0}}
l21 := []keyValVersion{{"B", "", 0, 0}, {"C", "", 0, 0}}
l22 := []keyValVersion{{"F", "", 0, 0}, {"G", "", 0, 0}}

// Level 2 has all the tables.
createAndOpen(db, l2, 2)
createAndOpen(db, l21, 2)
createAndOpen(db, l22, 2)

require.NoError(t, db.lc.validate())
require.NoError(t, db.DropPrefix([]byte("F")))

db.View(func(txn *Txn) error {
iopt := DefaultIteratorOptions
iopt.AllVersions = true

it := txn.NewIterator(iopt)
defer it.Close()

specialKey := []byte("F")
droppedPrefixes := [][]byte{specialKey, append(badgerMove, specialKey...)}
for it.Rewind(); it.Valid(); it.Next() {
key := it.Item().Key()
// Ensure we don't have any "F" or "!badger!move!F" left
require.False(t, hasAnyPrefixes(key, droppedPrefixes))
}
return nil
})
require.NoError(t, db.lc.validate())
})
}
14 changes: 7 additions & 7 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,13 @@ func (b *Builder) handleBlock() {
blockBuf = eBlock
}

// The newend should always be less than or equal to the original end
// plus the padding. If the new end is greater than item.end+padding
// that means the data from this block cannot be stored in its existing
// location and trying to copy it over would mean we would over-write
// some data of the next block.
y.AssertTruef(uint32(len(blockBuf)) <= item.end+padding,
"newend: %d item.end: %d padding: %d", len(blockBuf), item.end, padding)
// BlockBuf should always less than or equal to allocated space. If the blockBuf is greater
// than allocated space that means the data from this block cannot be stored in its
// existing location and trying to copy it over would mean we would over-write some data
// of the next block.
allocatedSpace := (item.end - item.start) + padding + 1
y.AssertTruef(uint32(len(blockBuf)) <= allocatedSpace, "newend: %d oldend: %d padding: %d",
item.start+uint32(len(blockBuf)), item.end, padding)

// Acquire the buflock here. The builder.grow function might change
// the b.buf while this goroutine was running.
Expand Down
Loading

0 comments on commit 3b7d067

Please sign in to comment.