Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Item::ValueCopy #316

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,12 @@ and [#315](https://github.com/dgraph-io/badger/issues/315).

There are multiple workarounds during iteration:

1. Use `Item::ValueCopy` instead of `Item::Value` when retrieving value.
1. Set `Prefetch` to true. Badger would then copy over the value and release the
file lock immediately.
2. When `Prefetch` is false, don't call `Item::Value` and do a pure key-only
1. When `Prefetch` is false, don't call `Item::Value` and do a pure key-only
iteration. This might be useful if you just want to delete a lot of keys.
3. Do the writes in a separate transaction after the reads.
1. Do the writes in a separate transaction after the reads.

- **My writes are really slow. Why?**

Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ func (db *DB) PurgeOlderVersions() error {
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
if !bytes.Equal(lastKey, item.Key()) {
lastKey = y.Safecopy(lastKey, item.Key())
lastKey = y.SafeCopy(lastKey, item.Key())
continue
}
// Found an older version. Mark for deletion
Expand Down
60 changes: 60 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func getItemValue(t *testing.T, item *Item) (val []byte) {
if v == nil {
return nil
}
another, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, v, another)
return v
}

Expand Down Expand Up @@ -1233,3 +1236,60 @@ func TestCreateDirs(t *testing.T) {
_, err = os.Stat(dir)
require.NoError(t, err)
}

func TestWriteDeadlock(t *testing.T) {
dir, err := ioutil.TempDir("", "badger")
fmt.Println(dir)
require.NoError(t, err)
defer os.RemoveAll(dir)

opt := DefaultOptions
opt.Dir = dir
opt.ValueDir = dir
opt.ValueLogFileSize = 10 << 20
db, err := Open(opt)
require.NoError(t, err)

print := func(count *int) {
*count++
if *count%100 == 0 {
fmt.Printf("%05d\r", *count)
}
}

var count int
val := make([]byte, 10000)
require.NoError(t, db.Update(func(txn *Txn) error {
for i := 0; i < 1500; i++ {
key := fmt.Sprintf("%d", i)
rand.Read(val)
require.NoError(t, txn.Set([]byte(key), val))
print(&count)
}
return nil
}))

count = 0
fmt.Println("\nWrites done. Iteration and updates starting...")
err = db.Update(func(txn *Txn) error {
opt := DefaultIteratorOptions
opt.PrefetchValues = false
it := txn.NewIterator(opt)
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()

// Using Value() would cause deadlock.
// item.Value()
out, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, len(val), len(out))

key := y.Copy(item.Key())
rand.Read(val)
require.NoError(t, txn.Set(key, val))
print(&count)
}
return nil
})
require.NoError(t, err)
}
22 changes: 19 additions & 3 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ func (item *Item) Value() ([]byte, error) {
return buf, err
}

// ValueCopy returns a copy of the value of the item from the value log, writing it to dst slice.
// If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
// returned. Tip: It might make sense to reuse the returned slice as dst argument for the next call.
//
// This function is useful in long running iterate/update transactions to avoid a write deadlock.
// See Github issue: https://github.com/dgraph-io/badger/issues/315
func (item *Item) ValueCopy(dst []byte) ([]byte, error) {
item.wg.Wait()
if item.status == prefetched {
return y.SafeCopy(dst, item.val), item.err
}
buf, cb, err := item.yieldItemValue()
defer runCallback(cb)
return y.SafeCopy(dst, buf), err
}

func (item *Item) hasValue() bool {
if item.meta == 0 && item.vptr == nil {
// key not found
Expand Down Expand Up @@ -374,7 +390,7 @@ func (it *Iterator) parseItem() bool {
// Consider keys: a 5, b 7 (del), b 5. When iterating, lastKey = a.
// Then we see b 7, which is deleted. If we don't store lastKey = b, we'll then return b 5,
// which is wrong. Therefore, update lastKey here.
it.lastKey = y.Safecopy(it.lastKey, mi.Key())
it.lastKey = y.SafeCopy(it.lastKey, mi.Key())
}

FILL:
Expand Down Expand Up @@ -414,9 +430,9 @@ func (it *Iterator) fill(item *Item) {
item.expiresAt = vs.ExpiresAt

item.version = y.ParseTs(it.iitr.Key())
item.key = y.Safecopy(item.key, y.ParseKey(it.iitr.Key()))
item.key = y.SafeCopy(item.key, y.ParseKey(it.iitr.Key()))

item.vptr = y.Safecopy(item.vptr, vs.Value)
item.vptr = y.SafeCopy(item.vptr, vs.Value)
item.val = nil
if it.opt.PrefetchValues {
item.wg.Add(1)
Expand Down
2 changes: 1 addition & 1 deletion table/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (itr *blockIterator) parseKV(h header) {
itr.pos, h.klen, h.vlen, len(itr.data), h)
return
}
itr.val = y.Safecopy(itr.val, itr.data[itr.pos:itr.pos+uint32(h.vlen)])
itr.val = y.SafeCopy(itr.val, itr.data[itr.pos:itr.pos+uint32(h.vlen)])
itr.pos += uint32(h.vlen)
}

Expand Down
4 changes: 2 additions & 2 deletions y/y.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func OpenTruncFile(filename string, sync bool) (*os.File, error) {
return os.OpenFile(filename, flags, 0666)
}

// Safecopy does append(a[:0], src...).
func Safecopy(a []byte, src []byte) []byte {
// SafeCopy does append(a[:0], src...).
func SafeCopy(a []byte, src []byte) []byte {
return append(a[:0], src...)
}

Expand Down