Skip to content

Commit

Permalink
Item::ValueCopy
Browse files Browse the repository at this point in the history
Add a ValueCopy function to avoid deadlocks in long running iterations.
Write Item::Value to a passed in byte slice.
  • Loading branch information
manishrjain committed Nov 13, 2017
1 parent 3deb65d commit b3568eb
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 9 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,12 @@ and [#315](https://github.com/dgraph-io/badger/issues/315).

There are multiple workarounds during iteration:

1. Use `Item::ValueCopy` instead of `Item::Value` when retrieving value.
1. Set `Prefetch` to true. Badger would then copy over the value and release the
file lock immediately.
2. When `Prefetch` is false, don't call `Item::Value` and do a pure key-only
1. When `Prefetch` is false, don't call `Item::Value` and do a pure key-only
iteration. This might be useful if you just want to delete a lot of keys.
3. Do the writes in a separate transaction after the reads.
1. Do the writes in a separate transaction after the reads.

- **My writes are really slow. Why?**

Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ func (db *DB) PurgeOlderVersions() error {
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
if !bytes.Equal(lastKey, item.Key()) {
lastKey = y.Safecopy(lastKey, item.Key())
lastKey = y.SafeCopy(lastKey, item.Key())
continue
}
// Found an older version. Mark for deletion
Expand Down
60 changes: 60 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func getItemValue(t *testing.T, item *Item) (val []byte) {
if v == nil {
return nil
}
another, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, v, another)
return v
}

Expand Down Expand Up @@ -1233,3 +1236,60 @@ func TestCreateDirs(t *testing.T) {
_, err = os.Stat(dir)
require.NoError(t, err)
}

func TestWriteDeadlock(t *testing.T) {
dir, err := ioutil.TempDir("", "badger")
fmt.Println(dir)
require.NoError(t, err)
defer os.RemoveAll(dir)

opt := DefaultOptions
opt.Dir = dir
opt.ValueDir = dir
opt.ValueLogFileSize = 10 << 20
db, err := Open(opt)
require.NoError(t, err)

print := func(count *int) {
*count++
if *count%100 == 0 {
fmt.Printf("%05d\r", *count)
}
}

var count int
val := make([]byte, 10000)
require.NoError(t, db.Update(func(txn *Txn) error {
for i := 0; i < 1500; i++ {
key := fmt.Sprintf("%d", i)
rand.Read(val)
require.NoError(t, txn.Set([]byte(key), val))
print(&count)
}
return nil
}))

count = 0
fmt.Println("\nWrites done. Iteration and updates starting...")
err = db.Update(func(txn *Txn) error {
opt := DefaultIteratorOptions
opt.PrefetchValues = false
it := txn.NewIterator(opt)
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()

// Using Value() would cause deadlock.
// item.Value()
out, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, len(val), len(out))

key := y.Copy(item.Key())
rand.Read(val)
require.NoError(t, txn.Set(key, val))
print(&count)
}
return nil
})
require.NoError(t, err)
}
22 changes: 19 additions & 3 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ func (item *Item) Value() ([]byte, error) {
return buf, err
}

// ValueCopy returns a copy of the value of the item from the value log, writing it to dst slice.
// If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
// returned. Tip: It might make sense to reuse the returned slice as dst argument for the next call.
//
// This function is useful in long running iterate/update transactions to avoid a write deadlock.
// See Github issue: https://github.com/dgraph-io/badger/issues/315
func (item *Item) ValueCopy(dst []byte) ([]byte, error) {
item.wg.Wait()
if item.status == prefetched {
return y.SafeCopy(dst, item.val), item.err
}
buf, cb, err := item.yieldItemValue()
defer runCallback(cb)
return y.SafeCopy(dst, buf), err
}

func (item *Item) hasValue() bool {
if item.meta == 0 && item.vptr == nil {
// key not found
Expand Down Expand Up @@ -374,7 +390,7 @@ func (it *Iterator) parseItem() bool {
// Consider keys: a 5, b 7 (del), b 5. When iterating, lastKey = a.
// Then we see b 7, which is deleted. If we don't store lastKey = b, we'll then return b 5,
// which is wrong. Therefore, update lastKey here.
it.lastKey = y.Safecopy(it.lastKey, mi.Key())
it.lastKey = y.SafeCopy(it.lastKey, mi.Key())
}

FILL:
Expand Down Expand Up @@ -414,9 +430,9 @@ func (it *Iterator) fill(item *Item) {
item.expiresAt = vs.ExpiresAt

item.version = y.ParseTs(it.iitr.Key())
item.key = y.Safecopy(item.key, y.ParseKey(it.iitr.Key()))
item.key = y.SafeCopy(item.key, y.ParseKey(it.iitr.Key()))

item.vptr = y.Safecopy(item.vptr, vs.Value)
item.vptr = y.SafeCopy(item.vptr, vs.Value)
item.val = nil
if it.opt.PrefetchValues {
item.wg.Add(1)
Expand Down
2 changes: 1 addition & 1 deletion table/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (itr *blockIterator) parseKV(h header) {
itr.pos, h.klen, h.vlen, len(itr.data), h)
return
}
itr.val = y.Safecopy(itr.val, itr.data[itr.pos:itr.pos+uint32(h.vlen)])
itr.val = y.SafeCopy(itr.val, itr.data[itr.pos:itr.pos+uint32(h.vlen)])
itr.pos += uint32(h.vlen)
}

Expand Down
4 changes: 2 additions & 2 deletions y/y.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func OpenTruncFile(filename string, sync bool) (*os.File, error) {
return os.OpenFile(filename, flags, 0666)
}

// Safecopy does append(a[:0], src...).
func Safecopy(a []byte, src []byte) []byte {
// SafeCopy does append(a[:0], src...).
func SafeCopy(a []byte, src []byte) []byte {
return append(a[:0], src...)
}

Expand Down

0 comments on commit b3568eb

Please sign in to comment.