From 21735af853dab8942e187dddbeeb41f3ecb04071 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 22 May 2020 18:53:42 +0530 Subject: [PATCH] fix: Fix race condition in block.incRef (#1337) Fixes https://github.com/dgraph-io/dgraph/issues/5456 . This PR fixes the crash that could occur when a block was read from the cache. There was a logical race condition. The following sequence of events could occur which would cause the crash. 1. An iterator makes `t.Block(idx)` call 2. The `t.Block` function finds the block in the cache. The newly found block has `ref=1` which means it was held only by the cache. 3. The `t.Block` function is holding the block and at the same time the block gets evicted from the cache. The existing ref of the block was `1` so the cache eviction would decrement the ref and make it `0`. When the ref becomes `0`, the block is added to the `sync.Pool` and is ready to be reused. 4. While the block got evicted from the cache, the iterator received the block and it incremented the ref from `0` to `1` and starts using this. Since the block was inserted into the syncPool in the 3rd event, it could be modified by anyone while the iterator is using it. --- badger/cmd/bank.go | 29 +++++++++++++++++++++++--- table/iterator.go | 2 -- table/table.go | 52 +++++++++++++++++++++++++++++++++++++++------- 3 files changed, 70 insertions(+), 13 deletions(-) diff --git a/badger/cmd/bank.go b/badger/cmd/bank.go index 5491eebd6..a312ca9e4 100644 --- a/badger/cmd/bank.go +++ b/badger/cmd/bank.go @@ -125,7 +125,7 @@ func toSlice(bal uint64) []byte { } func getBalance(txn *badger.Txn, account int) (uint64, error) { - item, err := txn.Get(key(account)) + item, err := get(txn, key(account)) if err != nil { return 0, err } @@ -197,6 +197,25 @@ func diff(a, b []account) string { var errFailure = errors.New("test failed due to balance mismatch") +// get function will fetch the value for the key "k" either by using the +// txn.Get API or the iterator.Seek API. +func get(txn *badger.Txn, k []byte) (*badger.Item, error) { + if rand.Int()%2 == 0 { + return txn.Get(k) + } + + iopt := badger.DefaultIteratorOptions + // PrefectValues is expensive. We don't need it here. + iopt.PrefetchValues = false + it := txn.NewIterator(iopt) + defer it.Close() + it.Seek(k) + if it.Valid() { + return it.Item(), nil + } + return nil, badger.ErrKeyNotFound +} + // seekTotal retrives the total of all accounts by seeking for each account key. func seekTotal(txn *badger.Txn) ([]account, error) { expected := uint64(numAccounts) * uint64(initialBal) @@ -204,7 +223,7 @@ func seekTotal(txn *badger.Txn) ([]account, error) { var total uint64 for i := 0; i < numAccounts; i++ { - item, err := txn.Get(key(i)) + item, err := get(txn, key(i)) if err != nil { log.Printf("Error for account: %d. err=%v. key=%q\n", i, err, key(i)) return accounts, err @@ -343,7 +362,11 @@ func runTest(cmd *cobra.Command, args []string) error { WithNumMemtables(2). // Do not GC any versions, because we need them for the disect.. WithNumVersionsToKeep(int(math.MaxInt32)). - WithValueThreshold(1) // Make all values go to value log + WithValueThreshold(1). // Make all values go to value log + WithCompression(options.ZSTD). + WithKeepL0InMemory(false). + WithMaxCacheSize(10 << 20) + if mmap { opts = opts.WithTableLoadingMode(options.MemoryMap) } diff --git a/table/iterator.go b/table/iterator.go index 574c3bc55..c987862ea 100644 --- a/table/iterator.go +++ b/table/iterator.go @@ -44,8 +44,6 @@ func (itr *blockIterator) setBlock(b *block) { // Decrement the ref for the old block. If the old block was compressed, we // might be able to reuse it. itr.block.decrRef() - // Increment the ref for the new block. - b.incrRef() itr.block = b itr.err = nil diff --git a/table/table.go b/table/table.go index 43406fa8c..da7bd44c8 100644 --- a/table/table.go +++ b/table/table.go @@ -187,25 +187,46 @@ type block struct { ref int32 } -func (b *block) incrRef() { - atomic.AddInt32(&b.ref, 1) +// incrRef increments the ref of a block and return a bool indicating if the +// increment was successful. A true value indicates that the block can be used. +func (b *block) incrRef() bool { + for { + // We can't blindly add 1 to ref. We need to check whether it has + // reached zero first, because if it did, then we should absolutely not + // use this block. + ref := atomic.LoadInt32(&b.ref) + // The ref would not be equal to 0 unless the existing + // block get evicted before this line. If the ref is zero, it means that + // the block is already added the the blockPool and cannot be used + // anymore. The ref of a new block is 1 so the following condition will + // be true only if the block got reused before we could increment its + // ref. + if ref == 0 { + return false + } + // Increment the ref only if it is not zero and has not changed between + // the time we read it and we're updating it. + // + if atomic.CompareAndSwapInt32(&b.ref, ref, ref+1) { + return true + } + } } func (b *block) decrRef() { if b == nil { return } - p := atomic.AddInt32(&b.ref, -1) // Insert the []byte into pool only if the block is resuable. When a block // is reusable a new []byte is used for decompression and this []byte can // be reused. // In case of an uncompressed block, the []byte is a reference to the // table.mmap []byte slice. Any attempt to write data to the mmap []byte // will lead to SEGFAULT. - if p == 0 && b.isReusable { + if atomic.AddInt32(&b.ref, -1) == 0 && b.isReusable { blockPool.Put(&b.data) } - y.AssertTrue(p >= 0) + y.AssertTrue(atomic.LoadInt32(&b.ref) >= 0) } func (b *block) size() int64 { return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ + @@ -419,6 +440,9 @@ func (t *Table) readIndex() error { return nil } +// block function return a new block. Each block holds a ref and the byte +// slice stored in the block will be reused when the ref becomes zero. The +// caller should release the block by calling block.decrRef() on it. func (t *Table) block(idx int) (*block, error) { y.AssertTruef(idx >= 0, "idx=%d", idx) if idx >= len(t.blockIndex) { @@ -428,12 +452,18 @@ func (t *Table) block(idx int) (*block, error) { key := t.blockCacheKey(idx) blk, ok := t.opt.Cache.Get(key) if ok && blk != nil { - return blk.(*block), nil + // Use the block only if the increment was successful. The block + // could get evicted from the cache between the Get() call and the + // incrRef() call. + if b := blk.(*block); b.incrRef() { + return b, nil + } } } ko := t.blockIndex[idx] blk := &block{ offset: int(ko.Offset), + ref: 1, } var err error if blk.data, err = t.read(blk.offset, int(ko.Len)); err != nil { @@ -490,8 +520,14 @@ func (t *Table) block(idx int) (*block, error) { } if t.opt.Cache != nil { key := t.blockCacheKey(idx) - blk.incrRef() - t.opt.Cache.Set(key, blk, blk.size()) + // incrRef should never return false here because we're calling it on a + // new block with ref=1. + y.AssertTrue(blk.incrRef()) + + // Decrement the block ref if we could not insert it in the cache. + if !t.opt.Cache.Set(key, blk, blk.size()) { + blk.decrRef() + } } return blk, nil }