Skip to content

Commit

Permalink
Revert "Buffer pool for decompression (#1308)" (#1408)
Browse files Browse the repository at this point in the history
This reverts commit aadda9a.

This commit is being reverted because we have seen some crashes
which could be caused by it. We haven't been able to reproduce the crashes yet.

Related to #1389, #1388, #1387
Also, see https://discuss.dgraph.io/t/current-state-of-badger-crashes/7602
  • Loading branch information
Ibrahim Jarif authored Jul 10, 2020
1 parent 63d9309 commit 800305e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 78 deletions.
3 changes: 0 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,6 @@ func Open(opt Options) (db *DB, err error) {
MaxCost: int64(float64(opt.MaxCacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
OnEvict: func(_, _ uint64, value interface{}, _ int64) {
table.BlockEvictHandler(value)
},
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
Expand Down
15 changes: 7 additions & 8 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,10 @@ func NewTableBuilder(opts Options) *Builder {
return b
}

var blockPool = &sync.Pool{
var slicePool = sync.Pool{
New: func() interface{} {
// Create 5 Kb blocks even when the default size of blocks is 4 KB. The
// ZSTD decompresion library increases the buffer by 2X if it's not big
// enough. Using a 5 KB block instead of a 4 KB one avoids the
// unncessary 2X allocation by the decompression library.
b := make([]byte, 5<<10)
// Make 4 KB blocks for reuse.
b := make([]byte, 0, 4<<10)
return &b
},
}
Expand All @@ -138,7 +135,9 @@ func (b *Builder) handleBlock() {
// Compress the block.
if b.opt.Compression != options.None {
var err error
dst = blockPool.Get().(*[]byte)

dst = slicePool.Get().(*[]byte)
*dst = (*dst)[:0]

blockBuf, err = b.compressData(*dst, blockBuf)
y.Check(err)
Expand Down Expand Up @@ -168,7 +167,7 @@ func (b *Builder) handleBlock() {
item.end = item.start + uint32(len(blockBuf))

if dst != nil {
blockPool.Put(dst)
slicePool.Put(dst)
}
}
}
Expand Down
13 changes: 1 addition & 12 deletions table/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,13 @@ type blockIterator struct {
key []byte
val []byte
entryOffsets []uint32
block *block

// prevOverlap stores the overlap of the previous key with the base key.
// This avoids unnecessary copy of base key when the overlap is same for multiple keys.
prevOverlap uint16
}

func (itr *blockIterator) setBlock(b *block) {
// Decrement the ref for the old block. If the old block was compressed, we
// might be able to reuse it.
itr.block.decrRef()
// Increment the ref for the new block.
b.incrRef()

itr.block = b
itr.err = nil
itr.idx = 0
itr.baseKey = itr.baseKey[:0]
Expand Down Expand Up @@ -110,9 +102,7 @@ func (itr *blockIterator) Error() error {
return itr.err
}

func (itr *blockIterator) Close() {
itr.block.decrRef()
}
func (itr *blockIterator) Close() {}

var (
origin = 0
Expand Down Expand Up @@ -182,7 +172,6 @@ func (t *Table) NewIterator(reversed bool) *Iterator {

// Close closes the iterator (and it must be called).
func (itr *Iterator) Close() error {
itr.bi.Close()
return itr.t.DecrRef()
}

Expand Down
67 changes: 12 additions & 55 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,44 +184,15 @@ func (t *Table) DecrRef() error {
return nil
}

// BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction.
func BlockEvictHandler(value interface{}) {
if b, ok := value.(*block); ok {
b.decrRef()
}
}

type block struct {
offset int
data []byte
checksum []byte
entriesIndexStart int // start index of entryOffsets list
entryOffsets []uint32 // used to binary search an entry in the block.
chkLen int // checksum length.
isReusable bool // used to determine if the blocked should be reused.
ref int32
entriesIndexStart int // start index of entryOffsets list
entryOffsets []uint32
chkLen int // checksum length
}

func (b *block) incrRef() {
atomic.AddInt32(&b.ref, 1)
}
func (b *block) decrRef() {
if b == nil {
return
}

p := atomic.AddInt32(&b.ref, -1)
// Insert the []byte into pool only if the block is resuable. When a block
// is reusable a new []byte is used for decompression and this []byte can
// be reused.
// In case of an uncompressed block, the []byte is a reference to the
// table.mmap []byte slice. Any attempt to write data to the mmap []byte
// will lead to SEGFAULT.
if p == 0 && b.isReusable {
blockPool.Put(&b.data)
}
y.AssertTrue(p >= 0)
}
func (b *block) size() int64 {
return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4)
Expand Down Expand Up @@ -509,7 +480,8 @@ func (t *Table) block(idx int) (*block, error) {
}
}

if err = t.decompress(blk); err != nil {
blk.data, err = t.decompressData(blk.data)
if err != nil {
return nil, errors.Wrapf(err,
"failed to decode compressed data in file: %s at offset: %d, len: %d",
t.fd.Name(), blk.offset, ko.Len)
Expand Down Expand Up @@ -551,7 +523,6 @@ func (t *Table) block(idx int) (*block, error) {
}
if t.opt.Cache != nil && t.opt.KeepBlocksInCache {
key := t.blockCacheKey(idx)
blk.incrRef()
t.opt.Cache.Set(key, blk, blk.size())
}
return blk, nil
Expand Down Expand Up @@ -671,8 +642,7 @@ func (t *Table) VerifyChecksum() error {
return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d",
t.Filename(), i, os.Offset)
}
b.incrRef()
defer b.decrRef()

// OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum
// on block, verification would be done while reading block itself.
if !(t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead) {
Expand Down Expand Up @@ -738,28 +708,15 @@ func NewFilename(id uint64, dir string) string {
return filepath.Join(dir, IDToFilename(id))
}

// decompress decompresses the data stored in a block.
func (t *Table) decompress(b *block) error {
var err error
// decompressData decompresses the given data.
func (t *Table) decompressData(data []byte) ([]byte, error) {
switch t.opt.Compression {
case options.None:
// Nothing to be done here.
return data, nil
case options.Snappy:
dst := blockPool.Get().(*[]byte)
b.data, err = snappy.Decode(*dst, b.data)
if err != nil {
return errors.Wrap(err, "failed to decompress")
}
b.isReusable = true
return snappy.Decode(nil, data)
case options.ZSTD:
dst := blockPool.Get().(*[]byte)
b.data, err = y.ZSTDDecompress(*dst, b.data)
if err != nil {
return errors.Wrap(err, "failed to decompress")
}
b.isReusable = true
default:
return errors.New("Unsupported compression type")
return y.ZSTDDecompress(nil, data)
}
return nil
return nil, errors.New("Unsupported compression type")
}

0 comments on commit 800305e

Please sign in to comment.