Skip to content

Commit

Permalink
Revert "fix: Fix race condition in block.incRef (#1337)"
Browse files Browse the repository at this point in the history
This reverts commit 21735af.
  • Loading branch information
Ibrahim Jarif committed Jul 5, 2020
1 parent a0f078a commit 8d68b04
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 70 deletions.
29 changes: 3 additions & 26 deletions badger/cmd/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func toSlice(bal uint64) []byte {
}

func getBalance(txn *badger.Txn, account int) (uint64, error) {
item, err := get(txn, key(account))
item, err := txn.Get(key(account))
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -197,33 +197,14 @@ func diff(a, b []account) string {

var errFailure = errors.New("test failed due to balance mismatch")

// get function will fetch the value for the key "k" either by using the
// txn.Get API or the iterator.Seek API.
func get(txn *badger.Txn, k []byte) (*badger.Item, error) {
if rand.Int()%2 == 0 {
return txn.Get(k)
}

iopt := badger.DefaultIteratorOptions
// PrefectValues is expensive. We don't need it here.
iopt.PrefetchValues = false
it := txn.NewIterator(iopt)
defer it.Close()
it.Seek(k)
if it.Valid() {
return it.Item(), nil
}
return nil, badger.ErrKeyNotFound
}

// seekTotal retrives the total of all accounts by seeking for each account key.
func seekTotal(txn *badger.Txn) ([]account, error) {
expected := uint64(numAccounts) * uint64(initialBal)
var accounts []account

var total uint64
for i := 0; i < numAccounts; i++ {
item, err := get(txn, key(i))
item, err := txn.Get(key(i))
if err != nil {
log.Printf("Error for account: %d. err=%v. key=%q\n", i, err, key(i))
return accounts, err
Expand Down Expand Up @@ -362,11 +343,7 @@ func runTest(cmd *cobra.Command, args []string) error {
WithNumMemtables(2).
// Do not GC any versions, because we need them for the disect..
WithNumVersionsToKeep(int(math.MaxInt32)).
WithValueThreshold(1). // Make all values go to value log
WithCompression(options.ZSTD).
WithKeepL0InMemory(false).
WithMaxCacheSize(10 << 20)

WithValueThreshold(1) // Make all values go to value log
if mmap {
opts = opts.WithTableLoadingMode(options.MemoryMap)
}
Expand Down
2 changes: 2 additions & 0 deletions table/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func (itr *blockIterator) setBlock(b *block) {
// Decrement the ref for the old block. If the old block was compressed, we
// might be able to reuse it.
itr.block.decrRef()
// Increment the ref for the new block.
b.incrRef()

itr.block = b
itr.err = nil
Expand Down
52 changes: 8 additions & 44 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,46 +199,25 @@ type block struct {
ref int32
}

// incrRef increments the ref of a block and return a bool indicating if the
// increment was successful. A true value indicates that the block can be used.
func (b *block) incrRef() bool {
for {
// We can't blindly add 1 to ref. We need to check whether it has
// reached zero first, because if it did, then we should absolutely not
// use this block.
ref := atomic.LoadInt32(&b.ref)
// The ref would not be equal to 0 unless the existing
// block get evicted before this line. If the ref is zero, it means that
// the block is already added the the blockPool and cannot be used
// anymore. The ref of a new block is 1 so the following condition will
// be true only if the block got reused before we could increment its
// ref.
if ref == 0 {
return false
}
// Increment the ref only if it is not zero and has not changed between
// the time we read it and we're updating it.
//
if atomic.CompareAndSwapInt32(&b.ref, ref, ref+1) {
return true
}
}
func (b *block) incrRef() {
atomic.AddInt32(&b.ref, 1)
}
func (b *block) decrRef() {
if b == nil {
return
}

p := atomic.AddInt32(&b.ref, -1)
// Insert the []byte into pool only if the block is resuable. When a block
// is reusable a new []byte is used for decompression and this []byte can
// be reused.
// In case of an uncompressed block, the []byte is a reference to the
// table.mmap []byte slice. Any attempt to write data to the mmap []byte
// will lead to SEGFAULT.
if atomic.AddInt32(&b.ref, -1) == 0 && b.isReusable {
if p == 0 && b.isReusable {
blockPool.Put(&b.data)
}
y.AssertTrue(atomic.LoadInt32(&b.ref) >= 0)
y.AssertTrue(p >= 0)
}
func (b *block) size() int64 {
return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
Expand Down Expand Up @@ -496,9 +475,6 @@ func calculateOffsetsSize(offsets []*pb.BlockOffset) int64 {
return totalSize + 3*8
}

// block function return a new block. Each block holds a ref and the byte
// slice stored in the block will be reused when the ref becomes zero. The
// caller should release the block by calling block.decrRef() on it.
func (t *Table) block(idx int) (*block, error) {
y.AssertTruef(idx >= 0, "idx=%d", idx)
if idx >= t.noOfBlocks {
Expand All @@ -508,20 +484,14 @@ func (t *Table) block(idx int) (*block, error) {
key := t.blockCacheKey(idx)
blk, ok := t.opt.Cache.Get(key)
if ok && blk != nil {
// Use the block only if the increment was successful. The block
// could get evicted from the cache between the Get() call and the
// incrRef() call.
if b := blk.(*block); b.incrRef() {
return b, nil
}
return blk.(*block), nil
}
}

// Read the block index if it's nil
ko := t.blockOffsets()[idx]
blk := &block{
offset: int(ko.Offset),
ref: 1,
}
var err error
if blk.data, err = t.read(blk.offset, int(ko.Len)); err != nil {
Expand Down Expand Up @@ -578,14 +548,8 @@ func (t *Table) block(idx int) (*block, error) {
}
if t.opt.Cache != nil && t.opt.KeepBlocksInCache {
key := t.blockCacheKey(idx)
// incrRef should never return false here because we're calling it on a
// new block with ref=1.
y.AssertTrue(blk.incrRef())

// Decrement the block ref if we could not insert it in the cache.
if !t.opt.Cache.Set(key, blk, blk.size()) {
blk.decrRef()
}
blk.incrRef()
t.opt.Cache.Set(key, blk, blk.size())
}
return blk, nil
}
Expand Down

0 comments on commit 8d68b04

Please sign in to comment.