diff --git a/db2_test.go b/db2_test.go index d86e5a40a..4c29d6358 100644 --- a/db2_test.go +++ b/db2_test.go @@ -670,13 +670,16 @@ func TestL0GCBug(t *testing.T) { // Simulate a crash by not closing db1 but releasing the locks. if db1.dirLockGuard != nil { require.NoError(t, db1.dirLockGuard.release()) - db1.dirLockGuard = nil } if db1.valueDirGuard != nil { require.NoError(t, db1.valueDirGuard.release()) - db1.valueDirGuard = nil } - require.NoError(t, db1.Close()) + for _, f := range db1.vlog.filesMap { + require.NoError(t, f.fd.Close()) + } + require.NoError(t, db1.registry.Close()) + require.NoError(t, db1.lc.close()) + require.NoError(t, db1.manifest.close()) db2, err := Open(opts) require.NoError(t, err) diff --git a/options.go b/options.go index 4a0d1e0c3..cb77fd7cf 100644 --- a/options.go +++ b/options.go @@ -176,7 +176,6 @@ func DefaultOptions(path string) Options { func buildTableOptions(opt Options) table.Options { return table.Options{ - TableSize: uint64(opt.MaxTableSize), BlockSize: opt.BlockSize, BloomFalsePositive: opt.BloomFalsePositive, LoadBloomsOnOpen: opt.LoadBloomsOnOpen, diff --git a/table/builder.go b/table/builder.go index 5715882c7..7c7b9d2ca 100644 --- a/table/builder.go +++ b/table/builder.go @@ -17,10 +17,9 @@ package table import ( + "bytes" "crypto/aes" "math" - "runtime" - "sync" "unsafe" "github.com/dgryski/go-farm" @@ -34,14 +33,11 @@ import ( "github.com/dgraph-io/ristretto/z" ) -const ( - KB = 1024 - MB = KB * 1024 - - // When a block is encrypted, it's length increases. We add 200 bytes of padding to - // handle cases when block size increases. This is an approximate number. - padding = 200 -) +func newBuffer(sz int) *bytes.Buffer { + b := new(bytes.Buffer) + b.Grow(sz) + return b +} type header struct { overlap uint16 // Overlap with base key. @@ -65,18 +61,10 @@ func (h *header) Decode(buf []byte) { copy(((*[headerSize]byte)(unsafe.Pointer(h))[:]), buf[:headerSize]) } -type bblock struct { - data []byte - start uint32 // Points to the starting offset of the block. - end uint32 // Points to the end offset of the block. -} - // Builder is used in building a table. type Builder struct { // Typically tens or hundreds of meg. This is for one single file. - buf []byte - sz uint32 - bufLock sync.Mutex // This lock guards the buf. We acquire lock when we resize the buf. + buf *bytes.Buffer baseKey []byte // Base key for the current block. baseOffset uint32 // Offset for the current block. @@ -84,100 +72,23 @@ type Builder struct { tableIndex *pb.TableIndex keyHashes []uint64 // Used for building the bloomfilter. opt *Options - - // Used to concurrently compress/encrypt blocks. - wg sync.WaitGroup - blockChan chan *bblock - blockList []*bblock } // NewTableBuilder makes a new TableBuilder. func NewTableBuilder(opts Options) *Builder { - b := &Builder{ - // Additional 5 MB to store index (approximate). - // We trim the additional space in table.Finish(). - buf: make([]byte, opts.TableSize+5*MB), + return &Builder{ + buf: newBuffer(1 << 20), tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, } - - // If encryption or compression is not enabled, do not start compression/encryption goroutines - // and write directly to the buffer. - if b.opt.Compression == options.None && b.opt.DataKey == nil { - return b - } - - count := 2 * runtime.NumCPU() - b.blockChan = make(chan *bblock, count*2) - - b.wg.Add(count) - for i := 0; i < count; i++ { - go b.handleBlock() - } - return b -} - -var blockPool = &sync.Pool{ - New: func() interface{} { - // Create 5 Kb blocks even when the default size of blocks is 4 KB. The - // ZSTD decompresion library increases the buffer by 2X if it's not big - // enough. Using a 5 KB block instead of a 4 KB one avoids the - // unncessary 2X allocation by the decompression library. - b := make([]byte, 5<<10) - return &b - }, -} - -func (b *Builder) handleBlock() { - defer b.wg.Done() - for item := range b.blockChan { - // Extract the block. - blockBuf := item.data[item.start:item.end] - var dst *[]byte - // Compress the block. - if b.opt.Compression != options.None { - var err error - dst = blockPool.Get().(*[]byte) - - blockBuf, err = b.compressData(*dst, blockBuf) - y.Check(err) - } - if b.shouldEncrypt() { - eBlock, err := b.encrypt(blockBuf) - y.Check(y.Wrapf(err, "Error while encrypting block in table builder.")) - blockBuf = eBlock - } - - // BlockBuf should always less than or equal to allocated space. If the blockBuf is greater - // than allocated space that means the data from this block cannot be stored in its - // existing location and trying to copy it over would mean we would over-write some data - // of the next block. - allocatedSpace := (item.end - item.start) + padding + 1 - y.AssertTruef(uint32(len(blockBuf)) <= allocatedSpace, "newend: %d oldend: %d padding: %d", - item.start+uint32(len(blockBuf)), item.end, padding) - - // Acquire the buflock here. The builder.grow function might change - // the b.buf while this goroutine was running. - b.bufLock.Lock() - // Copy over compressed/encrypted data back to the main buffer. - copy(b.buf[item.start:], blockBuf) - b.bufLock.Unlock() - - // Fix the boundary of the block. - item.end = item.start + uint32(len(blockBuf)) - - if dst != nil { - blockPool.Put(dst) - } - } } // Close closes the TableBuilder. func (b *Builder) Close() {} // Empty returns whether it's empty. -func (b *Builder) Empty() bool { return b.sz == 0 } +func (b *Builder) Empty() bool { return b.buf.Len() == 0 } // keyDiff returns a suffix of newKey that is different from b.baseKey. func (b *Builder) keyDiff(newKey []byte) []byte { @@ -213,54 +124,20 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { } // store current entry's offset - y.AssertTrue(b.sz < math.MaxUint32) - b.entryOffsets = append(b.entryOffsets, b.sz-b.baseOffset) + y.AssertTrue(uint32(b.buf.Len()) < math.MaxUint32) + b.entryOffsets = append(b.entryOffsets, uint32(b.buf.Len())-b.baseOffset) // Layout: header, diffKey, value. - b.append(h.Encode()) - b.append(diffKey) - - if uint32(len(b.buf)) < b.sz+v.EncodedSize() { - b.grow(v.EncodedSize()) - } - b.sz += v.Encode(b.buf[b.sz:]) + b.buf.Write(h.Encode()) + b.buf.Write(diffKey) // We only need to store the key difference. + v.EncodeTo(b.buf) // Size of KV on SST. sstSz := uint64(uint32(headerSize) + uint32(len(diffKey)) + v.EncodedSize()) // Total estimated size = size on SST + size on vlog (length of value pointer). b.tableIndex.EstimatedSize += (sstSz + vpLen) } -// grow increases the size of b.buf by atleast 50%. -func (b *Builder) grow(n uint32) { - l := uint32(len(b.buf)) - if n < l/2 { - n = l / 2 - } - y.AssertTruef(uint64(len(b.buf))+uint64(n) < math.MaxUint32, - "old: %d new: %d", len(b.buf), n) - b.bufLock.Lock() - newBuf := make([]byte, l+n) - copy(newBuf, b.buf) - b.buf = newBuf - b.bufLock.Unlock() -} -func (b *Builder) append(data []byte) { - // Ensure we have enough space to store new data. - if uint32(len(b.buf)) < b.sz+uint32(len(data)) { - b.grow(uint32(len(data))) - } - copy(b.buf[b.sz:], data) - b.sz += uint32(len(data)) -} - -func (b *Builder) addPadding(sz uint32) { - if uint32(len(b.buf)) < b.sz+sz { - b.grow(sz) - } - b.sz += sz -} - /* Structure of Block. +-------------------+---------------------+--------------------+--------------+------------------+ @@ -274,36 +151,41 @@ Structure of Block. */ // In case the data is encrypted, the "IV" is added to the end of the block. func (b *Builder) finishBlock() { - b.append(y.U32SliceToBytes(b.entryOffsets)) - b.append(y.U32ToBytes(uint32(len(b.entryOffsets)))) - - b.writeChecksum(b.buf[b.baseOffset:b.sz]) - - // If compression/encryption is disabled, no need to send the block to the blockChan. - // There's nothing to be done. - if b.blockChan == nil { - b.addBlockToIndex() - return + b.buf.Write(y.U32SliceToBytes(b.entryOffsets)) + b.buf.Write(y.U32ToBytes(uint32(len(b.entryOffsets)))) + + blockBuf := b.buf.Bytes()[b.baseOffset:] // Store checksum for current block. + b.writeChecksum(blockBuf) + + // Compress the block. + if b.opt.Compression != options.None { + var err error + // TODO: Find a way to reuse buffers. Current implementation creates a + // new buffer for each compressData call. + blockBuf, err = b.compressData(b.buf.Bytes()[b.baseOffset:]) + y.Check(err) + // Truncate already written data. + b.buf.Truncate(int(b.baseOffset)) + // Write compressed data. + b.buf.Write(blockBuf) + } + if b.shouldEncrypt() { + block := b.buf.Bytes()[b.baseOffset:] + eBlock, err := b.encrypt(block) + y.Check(y.Wrapf(err, "Error while encrypting block in table builder.")) + // We're rewriting the block, after encrypting. + b.buf.Truncate(int(b.baseOffset)) + b.buf.Write(eBlock) } - b.addPadding(padding) - - // Block end is the actual end of the block ignoring the padding. - block := &bblock{start: b.baseOffset, end: uint32(b.sz - padding), data: b.buf} - b.blockList = append(b.blockList, block) - - b.addBlockToIndex() - // Push to the block handler. - b.blockChan <- block -} + // TODO(Ashish):Add padding: If we want to make block as multiple of OS pages, we can + // implement padding. This might be useful while using direct I/O. -func (b *Builder) addBlockToIndex() { - blockBuf := b.buf[b.baseOffset:b.sz] - // Add key to the block index. + // Add key to the block index bo := &pb.BlockOffset{ Key: y.Copy(b.baseKey), Offset: b.baseOffset, - Len: uint32(len(blockBuf)), + Len: uint32(b.buf.Len()) - b.baseOffset, } b.tableIndex.Offsets = append(b.tableIndex.Offsets, bo) } @@ -321,7 +203,7 @@ func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { 4 + // size of list 8 + // Sum64 in checksum proto 4) // checksum length - estimatedSize := uint32(b.sz) - b.baseOffset + uint32(6 /*header size for entry*/) + + estimatedSize := uint32(b.buf.Len()) - b.baseOffset + uint32(6 /*header size for entry*/) + uint32(len(key)) + uint32(value.EncodedSize()) + entriesOffsetsSize if b.shouldEncrypt() { @@ -334,12 +216,11 @@ func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { // Add adds a key-value pair to the block. func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { - y.AssertTrue(b.sz < math.MaxUint32) if b.shouldFinishBlock(key, value) { b.finishBlock() // Start a new block. Initialize the block. b.baseKey = []byte{} - b.baseOffset = uint32((b.sz)) + b.baseOffset = uint32(b.buf.Len()) b.entryOffsets = b.entryOffsets[:0] } b.addHelper(key, value, uint64(valueLen)) @@ -353,14 +234,14 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { // ReachedCapacity returns true if we... roughly (?) reached capacity? func (b *Builder) ReachedCapacity(cap int64) bool { - blocksSize := b.sz + // length of current buffer - uint32(len(b.entryOffsets)*4) + // all entry offsets size + blocksSize := b.buf.Len() + // length of current buffer + len(b.entryOffsets)*4 + // all entry offsets size 4 + // count of all entry offsets 8 + // checksum bytes 4 // checksum length estimateSz := blocksSize + 4 + // Index length - 5*(uint32(len(b.tableIndex.Offsets))) // approximate index size + 5*(len(b.tableIndex.Offsets)) // approximate index size return int64(estimateSz) > cap } @@ -387,35 +268,6 @@ func (b *Builder) Finish() []byte { b.finishBlock() // This will never start a new block. - if b.blockChan != nil { - close(b.blockChan) - } - // Wait for block handler to finish. - b.wg.Wait() - - dst := b.buf - // Fix block boundaries. This includes moving the blocks so that we - // don't have any interleaving space between them. - if len(b.blockList) > 0 { - dstLen := uint32(0) - for i, bl := range b.blockList { - off := b.tableIndex.Offsets[i] - // Length of the block is end minus the start. - off.Len = bl.end - bl.start - // New offset of the block is the point in the main buffer till - // which we have written data. - off.Offset = dstLen - - copy(dst[dstLen:], b.buf[bl.start:bl.end]) - - // New length is the start of the block plus its length. - dstLen = off.Offset + off.Len - } - // Start writing to the buffer from the point until which we have valid data. - // Fix the length because append and writeChecksum also rely on it. - b.sz = dstLen - } - index, err := proto.Marshal(b.tableIndex) y.Check(err) @@ -423,12 +275,17 @@ func (b *Builder) Finish() []byte { index, err = b.encrypt(index) y.Check(err) } - // Write index the buffer. - b.append(index) - b.append(y.U32ToBytes(uint32(len(index)))) + // Write index the file. + n, err := b.buf.Write(index) + y.Check(err) + + y.AssertTrue(uint32(n) < math.MaxUint32) + // Write index size. + _, err = b.buf.Write(y.U32ToBytes(uint32(n))) + y.Check(err) b.writeChecksum(index) - return b.buf[:b.sz] + return b.buf.Bytes() } func (b *Builder) writeChecksum(data []byte) { @@ -449,10 +306,13 @@ func (b *Builder) writeChecksum(data []byte) { // Write checksum to the file. chksum, err := proto.Marshal(&checksum) y.Check(err) - b.append(chksum) + n, err := b.buf.Write(chksum) + y.Check(err) + y.AssertTrue(uint32(n) < math.MaxUint32) // Write checksum size. - b.append(y.U32ToBytes(uint32(len(chksum)))) + _, err = b.buf.Write(y.U32ToBytes(uint32(n))) + y.Check(err) } // DataKey returns datakey of the builder. @@ -482,14 +342,14 @@ func (b *Builder) shouldEncrypt() bool { } // compressData compresses the given data. -func (b *Builder) compressData(dst, data []byte) ([]byte, error) { +func (b *Builder) compressData(data []byte) ([]byte, error) { switch b.opt.Compression { case options.None: return data, nil case options.Snappy: - return snappy.Encode(dst, data), nil + return snappy.Encode(nil, data), nil case options.ZSTD: - return y.ZSTDCompress(dst, data, b.opt.ZSTDCompressionLevel) + return y.ZSTDCompress(nil, data, b.opt.ZSTDCompressionLevel) } return nil, errors.New("Unsupported compression type") } diff --git a/table/builder_test.go b/table/builder_test.go index dfa7f63fb..34e0c282e 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -32,60 +32,30 @@ import ( func TestTableIndex(t *testing.T) { rand.Seed(time.Now().Unix()) - keysCount := 100000 - key := make([]byte, 32) - _, err := rand.Read(key) - require.NoError(t, err) - subTest := []struct { - name string - opts Options - }{ - { - name: "No encyption/compression", - opts: Options{ - BlockSize: 4 * 1024, - BloomFalsePositive: 0.01, - TableSize: 30 << 20, - }, - }, - { - // Encryption mode. - name: "Only encryption", - opts: Options{ - BlockSize: 4 * 1024, - BloomFalsePositive: 0.01, - TableSize: 30 << 20, - DataKey: &pb.DataKey{Data: key}, - }, - }, - { - // Compression mode. - name: "Only compression", - opts: Options{ - BlockSize: 4 * 1024, - BloomFalsePositive: 0.01, - TableSize: 30 << 20, - Compression: options.ZSTD, - ZSTDCompressionLevel: 3, - }, - }, - { - // Compression mode and encryption. - name: "Compression and encryption", - opts: Options{ - BlockSize: 4 * 1024, - BloomFalsePositive: 0.01, - TableSize: 30 << 20, - Compression: options.ZSTD, - ZSTDCompressionLevel: 3, - DataKey: &pb.DataKey{Data: key}, - }, - }, - } + keyPrefix := "key" + t.Run("single key", func(t *testing.T) { + opts := Options{Compression: options.ZSTD} + f := buildTestTable(t, keyPrefix, 1, opts) + tbl, err := OpenTable(f, opts) + require.NoError(t, err) + require.Len(t, tbl.blockIndex, 1) + }) - for _, tt := range subTest { - t.Run(tt.name, func(t *testing.T) { - opt := tt.opts + t.Run("multiple keys", func(t *testing.T) { + opts := []Options{} + // Normal mode. + opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01}) + // Encryption mode. + key := make([]byte, 32) + _, err := rand.Read(key) + require.NoError(t, err) + opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, + DataKey: &pb.DataKey{Data: key}}) + // Compression mode. + opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, + Compression: options.ZSTD}) + keysCount := 10000 + for _, opt := range opts { builder := NewTableBuilder(opt) filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Uint32()) f, err := y.OpenSyncedFile(filename, true) @@ -111,9 +81,8 @@ func TestTableIndex(t *testing.T) { tbl, err := OpenTable(f, opt) require.NoError(t, err, "unable to open table") - if opt.DataKey == nil { - // key id is zero if there is no datakey. + // key id is zero if thre is no datakey. require.Equal(t, tbl.KeyID(), uint64(0)) } @@ -124,8 +93,8 @@ func TestTableIndex(t *testing.T) { } f.Close() require.NoError(t, os.RemoveAll(filename)) - }) - } + } + }) } func TestInvalidCompression(t *testing.T) { @@ -156,21 +125,18 @@ func BenchmarkBuilder(b *testing.B) { keysCount := 1300000 // This number of entries consumes ~64MB of memory. - var keyList [][]byte - for i := 0; i < keysCount; i++ { - keyList = append(keyList, key(i)) - } bench := func(b *testing.B, opt *Options) { + // KeyCount * (keySize + ValSize) b.SetBytes(int64(keysCount) * (32 + 32)) - opt.BlockSize = 4 * 1024 - opt.BloomFalsePositive = 0.01 - opt.TableSize = 5 << 20 - b.ResetTimer() for i := 0; i < b.N; i++ { + opt.BlockSize = 4 * 1024 + opt.BloomFalsePositive = 0.01 builder := NewTableBuilder(*opt) - for j := 0; j < keysCount; j++ { - builder.Add(keyList[j], vs, 0) + + for i := 0; i < keysCount; i++ { + builder.Add(key(i), vs, 0) } + _ = builder.Finish() } } @@ -180,13 +146,6 @@ func BenchmarkBuilder(b *testing.B) { opt.Compression = options.None bench(b, &opt) }) - b.Run("encryption", func(b *testing.B) { - var opt Options - key := make([]byte, 32) - rand.Read(key) - opt.DataKey = &pb.DataKey{Data: key} - bench(b, &opt) - }) b.Run("zstd compression", func(b *testing.B) { var opt Options opt.Compression = options.ZSTD diff --git a/table/iterator.go b/table/iterator.go index d48e58138..5f3df6a99 100644 --- a/table/iterator.go +++ b/table/iterator.go @@ -73,7 +73,6 @@ func (itr *blockIterator) setIdx(i int) { baseHeader.Decode(itr.data) itr.baseKey = itr.data[headerSize : headerSize+baseHeader.diff] } - var endOffset int // idx points to the last entry in the block. if itr.idx+1 == len(itr.entryOffsets) { diff --git a/table/table.go b/table/table.go index e66fa63d0..e7edc7b45 100644 --- a/table/table.go +++ b/table/table.go @@ -56,9 +56,6 @@ const sizeOfOffsetStruct int64 = 3*8 + // key array take 3 words type Options struct { // Options for Opening/Building Table. - // Maximum size of the table. - TableSize uint64 - // ChkMode is the checksum verification mode for Table. ChkMode options.ChecksumVerificationMode @@ -552,7 +549,7 @@ func (t *Table) block(idx int) (*block, error) { // Checksum length greater than block size could happen if the table was compressed and // it was opened with an incorrect compression algorithm (or the data was corrupted). if blk.chkLen > len(blk.data) { - return nil, errors.New("invalid checksum length. Either the data is " + + return nil, errors.New("invalid checksum length. Either the data is" + "corrupted or the table options are incorrectly set") } diff --git a/table/table_test.go b/table/table_test.go index a39d88e5a..8d3ce0aba 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -37,6 +37,11 @@ import ( "github.com/stretchr/testify/require" ) +const ( + KB = 1024 + MB = KB * 1024 +) + func key(prefix string, i int) string { return prefix + fmt.Sprintf("%04d", i) } @@ -353,7 +358,7 @@ func TestIterateBackAndForth(t *testing.T) { it.seekToFirst() k = it.Key() - require.EqualValues(t, key("key", 0), string(y.ParseKey(k))) + require.EqualValues(t, key("key", 0), y.ParseKey(k)) } func TestUniIterator(t *testing.T) { @@ -698,8 +703,7 @@ func TestTableBigValues(t *testing.T) { require.NoError(t, err, "unable to create file") n := 100 // Insert 100 keys. - opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01, - TableSize: uint64(n) * 1 << 20} + opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01} builder := NewTableBuilder(opts) for i := 0; i < n; i++ { key := y.KeyWithTs([]byte(key("", i)), 0) diff --git a/y/iterator.go b/y/iterator.go index 7c9b21194..6d0f677c0 100644 --- a/y/iterator.go +++ b/y/iterator.go @@ -64,12 +64,11 @@ func (v *ValueStruct) Decode(b []byte) { } // Encode expects a slice of length at least v.EncodedSize(). -func (v *ValueStruct) Encode(b []byte) uint32 { +func (v *ValueStruct) Encode(b []byte) { b[0] = v.Meta b[1] = v.UserMeta sz := binary.PutUvarint(b[2:], v.ExpiresAt) - n := copy(b[2+sz:], v.Value) - return uint32(2 + sz + n) + copy(b[2+sz:], v.Value) } // EncodeTo should be kept in sync with the Encode function above. The reason @@ -80,7 +79,6 @@ func (v *ValueStruct) EncodeTo(buf *bytes.Buffer) { buf.WriteByte(v.UserMeta) var enc [binary.MaxVarintLen64]byte sz := binary.PutUvarint(enc[:], v.ExpiresAt) - buf.Write(enc[:sz]) buf.Write(v.Value) }