diff --git a/db2_test.go b/db2_test.go index 56d9ca145..cb8ccbc8a 100644 --- a/db2_test.go +++ b/db2_test.go @@ -669,16 +669,13 @@ func TestL0GCBug(t *testing.T) { // Simulate a crash by not closing db1 but releasing the locks. if db1.dirLockGuard != nil { require.NoError(t, db1.dirLockGuard.release()) + db1.dirLockGuard = nil } if db1.valueDirGuard != nil { require.NoError(t, db1.valueDirGuard.release()) + db1.valueDirGuard = nil } - for _, f := range db1.vlog.filesMap { - require.NoError(t, f.fd.Close()) - } - require.NoError(t, db1.registry.Close()) - require.NoError(t, db1.lc.close()) - require.NoError(t, db1.manifest.close()) + require.NoError(t, db1.Close()) db2, err := Open(opts) require.NoError(t, err) diff --git a/options.go b/options.go index 4fbe09199..0e3c60e3f 100644 --- a/options.go +++ b/options.go @@ -155,6 +155,7 @@ func DefaultOptions(path string) Options { func buildTableOptions(opt Options) table.Options { return table.Options{ + TableSize: uint64(opt.MaxTableSize), BlockSize: opt.BlockSize, BloomFalsePositive: opt.BloomFalsePositive, LoadingMode: opt.TableLoadingMode, diff --git a/table/builder.go b/table/builder.go index 26519494b..3ac1d77d5 100644 --- a/table/builder.go +++ b/table/builder.go @@ -17,9 +17,10 @@ package table import ( - "bytes" "crypto/aes" "math" + "runtime" + "sync" "unsafe" "github.com/dgryski/go-farm" @@ -33,11 +34,14 @@ import ( "github.com/dgraph-io/ristretto/z" ) -func newBuffer(sz int) *bytes.Buffer { - b := new(bytes.Buffer) - b.Grow(sz) - return b -} +const ( + KB = 1024 + MB = KB * 1024 + + // When a block is encrypted, it's length increases. We add 200 bytes of padding to + // handle cases when block size increases. This is an approximate number. + padding = 200 +) type header struct { overlap uint16 // Overlap with base key. @@ -61,10 +65,18 @@ func (h *header) Decode(buf []byte) { copy(((*[headerSize]byte)(unsafe.Pointer(h))[:]), buf[:headerSize]) } +type bblock struct { + data []byte + start uint32 // Points to the starting offset of the block. + end uint32 // Points to the end offset of the block. +} + // Builder is used in building a table. type Builder struct { // Typically tens or hundreds of meg. This is for one single file. - buf *bytes.Buffer + buf []byte + sz uint32 + bufLock sync.Mutex // This lock guards the buf. We acquire lock when we resize the buf. baseKey []byte // Base key for the current block. baseOffset uint32 // Offset for the current block. @@ -72,23 +84,99 @@ type Builder struct { tableIndex *pb.TableIndex keyHashes []uint64 // Used for building the bloomfilter. opt *Options + + // Used to concurrently compress/encrypt blocks. + wg sync.WaitGroup + blockChan chan *bblock + blockList []*bblock } // NewTableBuilder makes a new TableBuilder. func NewTableBuilder(opts Options) *Builder { - return &Builder{ - buf: newBuffer(1 << 20), + b := &Builder{ + // Additional 5 MB to store index (approximate). + // We trim the additional space in table.Finish(). + buf: make([]byte, opts.TableSize+5*MB), tableIndex: &pb.TableIndex{}, keyHashes: make([]uint64, 0, 1024), // Avoid some malloc calls. opt: &opts, } + + // If encryption or compression is not enabled, do not start compression/encryption goroutines + // and write directly to the buffer. + if b.opt.Compression == options.None && b.opt.DataKey == nil { + return b + } + + count := 2 * runtime.NumCPU() + b.blockChan = make(chan *bblock, count*2) + + b.wg.Add(count) + for i := 0; i < count; i++ { + go b.handleBlock() + } + return b +} + +var slicePool = sync.Pool{ + New: func() interface{} { + // Make 4 KB blocks for reuse. + b := make([]byte, 0, 4<<10) + return &b + }, +} + +func (b *Builder) handleBlock() { + defer b.wg.Done() + for item := range b.blockChan { + // Extract the block. + blockBuf := item.data[item.start:item.end] + var dst *[]byte + // Compress the block. + if b.opt.Compression != options.None { + var err error + + dst = slicePool.Get().(*[]byte) + *dst = (*dst)[:0] + + blockBuf, err = b.compressData(*dst, blockBuf) + y.Check(err) + } + if b.shouldEncrypt() { + eBlock, err := b.encrypt(blockBuf) + y.Check(y.Wrapf(err, "Error while encrypting block in table builder.")) + blockBuf = eBlock + } + + // The newend should always be less than or equal to the original end + // plus the padding. If the new end is greater than item.end+padding + // that means the data from this block cannot be stored in its existing + // location and trying to copy it over would mean we would over-write + // some data of the next block. + y.AssertTruef(uint32(len(blockBuf)) <= item.end+padding, + "newend: %d item.end: %d padding: %d", len(blockBuf), item.end, padding) + + // Acquire the buflock here. The builder.grow function might change + // the b.buf while this goroutine was running. + b.bufLock.Lock() + // Copy over compressed/encrypted data back to the main buffer. + copy(b.buf[item.start:], blockBuf) + b.bufLock.Unlock() + + // Fix the boundary of the block. + item.end = item.start + uint32(len(blockBuf)) + + if dst != nil { + slicePool.Put(dst) + } + } } // Close closes the TableBuilder. func (b *Builder) Close() {} // Empty returns whether it's empty. -func (b *Builder) Empty() bool { return b.buf.Len() == 0 } +func (b *Builder) Empty() bool { return b.sz == 0 } // keyDiff returns a suffix of newKey that is different from b.baseKey. func (b *Builder) keyDiff(newKey []byte) []byte { @@ -121,20 +209,52 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint64) { } // store current entry's offset - y.AssertTrue(uint32(b.buf.Len()) < math.MaxUint32) - b.entryOffsets = append(b.entryOffsets, uint32(b.buf.Len())-b.baseOffset) + y.AssertTrue(b.sz < math.MaxUint32) + b.entryOffsets = append(b.entryOffsets, b.sz-b.baseOffset) // Layout: header, diffKey, value. - b.buf.Write(h.Encode()) - b.buf.Write(diffKey) // We only need to store the key difference. + b.append(h.Encode()) + b.append(diffKey) + + if uint32(len(b.buf)) < b.sz+v.EncodedSize() { + b.grow(v.EncodedSize()) + } + b.sz += v.Encode(b.buf[b.sz:]) - v.EncodeTo(b.buf) // Size of KV on SST. sstSz := uint64(uint32(headerSize) + uint32(len(diffKey)) + v.EncodedSize()) // Total estimated size = size on SST + size on vlog (length of value pointer). b.tableIndex.EstimatedSize += (sstSz + vpLen) } +// grow increases the size of b.buf by atleast 50%. +func (b *Builder) grow(n uint32) { + l := uint32(len(b.buf)) + if n < l/2 { + n = l / 2 + } + b.bufLock.Lock() + newBuf := make([]byte, l+n) + copy(newBuf, b.buf) + b.buf = newBuf + b.bufLock.Unlock() +} +func (b *Builder) append(data []byte) { + // Ensure we have enough space to store new data. + if uint32(len(b.buf)) < b.sz+uint32(len(data)) { + b.grow(uint32(len(data))) + } + copy(b.buf[b.sz:], data) + b.sz += uint32(len(data)) +} + +func (b *Builder) addPadding(sz uint32) { + if uint32(len(b.buf)) < b.sz+sz { + b.grow(sz) + } + b.sz += sz +} + /* Structure of Block. +-------------------+---------------------+--------------------+--------------+------------------+ @@ -148,41 +268,36 @@ Structure of Block. */ // In case the data is encrypted, the "IV" is added to the end of the block. func (b *Builder) finishBlock() { - b.buf.Write(y.U32SliceToBytes(b.entryOffsets)) - b.buf.Write(y.U32ToBytes(uint32(len(b.entryOffsets)))) - - blockBuf := b.buf.Bytes()[b.baseOffset:] // Store checksum for current block. - b.writeChecksum(blockBuf) - - // Compress the block. - if b.opt.Compression != options.None { - var err error - // TODO: Find a way to reuse buffers. Current implementation creates a - // new buffer for each compressData call. - blockBuf, err = b.compressData(b.buf.Bytes()[b.baseOffset:]) - y.Check(err) - // Truncate already written data. - b.buf.Truncate(int(b.baseOffset)) - // Write compressed data. - b.buf.Write(blockBuf) - } - if b.shouldEncrypt() { - block := b.buf.Bytes()[b.baseOffset:] - eBlock, err := b.encrypt(block) - y.Check(y.Wrapf(err, "Error while encrypting block in table builder.")) - // We're rewriting the block, after encrypting. - b.buf.Truncate(int(b.baseOffset)) - b.buf.Write(eBlock) + b.append(y.U32SliceToBytes(b.entryOffsets)) + b.append(y.U32ToBytes(uint32(len(b.entryOffsets)))) + + b.writeChecksum(b.buf[b.baseOffset:b.sz]) + + // If compression/encryption is disabled, no need to send the block to the blockChan. + // There's nothing to be done. + if b.blockChan == nil { + b.addBlockToIndex() + return } - // TODO(Ashish):Add padding: If we want to make block as multiple of OS pages, we can - // implement padding. This might be useful while using direct I/O. + b.addPadding(padding) - // Add key to the block index + // Block end is the actual end of the block ignoring the padding. + block := &bblock{start: b.baseOffset, end: uint32(b.sz - padding), data: b.buf} + b.blockList = append(b.blockList, block) + + b.addBlockToIndex() + // Push to the block handler. + b.blockChan <- block +} + +func (b *Builder) addBlockToIndex() { + blockBuf := b.buf[b.baseOffset:b.sz] + // Add key to the block index. bo := &pb.BlockOffset{ Key: y.Copy(b.baseKey), Offset: b.baseOffset, - Len: uint32(b.buf.Len()) - b.baseOffset, + Len: uint32(len(blockBuf)), } b.tableIndex.Offsets = append(b.tableIndex.Offsets, bo) } @@ -200,7 +315,7 @@ func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { 4 + // size of list 8 + // Sum64 in checksum proto 4) // checksum length - estimatedSize := uint32(b.buf.Len()) - b.baseOffset + uint32(6 /*header size for entry*/) + + estimatedSize := uint32(b.sz) - b.baseOffset + uint32(6 /*header size for entry*/) + uint32(len(key)) + uint32(value.EncodedSize()) + entriesOffsetsSize if b.shouldEncrypt() { @@ -217,8 +332,8 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { b.finishBlock() // Start a new block. Initialize the block. b.baseKey = []byte{} - y.AssertTrue(uint32(b.buf.Len()) < math.MaxUint32) - b.baseOffset = uint32(b.buf.Len()) + y.AssertTrue(uint32(b.sz) < math.MaxUint32) + b.baseOffset = uint32((b.sz)) b.entryOffsets = b.entryOffsets[:0] } b.addHelper(key, value, uint64(valueLen)) @@ -232,14 +347,14 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { // ReachedCapacity returns true if we... roughly (?) reached capacity? func (b *Builder) ReachedCapacity(cap int64) bool { - blocksSize := b.buf.Len() + // length of current buffer - len(b.entryOffsets)*4 + // all entry offsets size + blocksSize := b.sz + // length of current buffer + uint32(len(b.entryOffsets)*4) + // all entry offsets size 4 + // count of all entry offsets 8 + // checksum bytes 4 // checksum length estimateSz := blocksSize + 4 + // Index length - 5*(len(b.tableIndex.Offsets)) // approximate index size + 5*(uint32(len(b.tableIndex.Offsets))) // approximate index size return int64(estimateSz) > cap } @@ -266,6 +381,35 @@ func (b *Builder) Finish() []byte { b.finishBlock() // This will never start a new block. + if b.blockChan != nil { + close(b.blockChan) + } + // Wait for block handler to finish. + b.wg.Wait() + + dst := b.buf + // Fix block boundaries. This includes moving the blocks so that we + // don't have any interleaving space between them. + if len(b.blockList) > 0 { + dstLen := uint32(0) + for i, bl := range b.blockList { + off := b.tableIndex.Offsets[i] + // Length of the block is end minus the start. + off.Len = bl.end - bl.start + // New offset of the block is the point in the main buffer till + // which we have written data. + off.Offset = dstLen + + copy(dst[dstLen:], b.buf[bl.start:bl.end]) + + // New length is the start of the block plus its length. + dstLen = off.Offset + off.Len + } + // Start writing to the buffer from the point until which we have valid data. + // Fix the length because append and writeChecksum also rely on it. + b.sz = dstLen + } + index, err := proto.Marshal(b.tableIndex) y.Check(err) @@ -273,17 +417,12 @@ func (b *Builder) Finish() []byte { index, err = b.encrypt(index) y.Check(err) } - // Write index the file. - n, err := b.buf.Write(index) - y.Check(err) - - y.AssertTrue(uint32(n) < math.MaxUint32) - // Write index size. - _, err = b.buf.Write(y.U32ToBytes(uint32(n))) - y.Check(err) + // Write index the buffer. + b.append(index) + b.append(y.U32ToBytes(uint32(len(index)))) b.writeChecksum(index) - return b.buf.Bytes() + return b.buf[:b.sz] } func (b *Builder) writeChecksum(data []byte) { @@ -304,13 +443,10 @@ func (b *Builder) writeChecksum(data []byte) { // Write checksum to the file. chksum, err := proto.Marshal(&checksum) y.Check(err) - n, err := b.buf.Write(chksum) - y.Check(err) + b.append(chksum) - y.AssertTrue(uint32(n) < math.MaxUint32) // Write checksum size. - _, err = b.buf.Write(y.U32ToBytes(uint32(n))) - y.Check(err) + b.append(y.U32ToBytes(uint32(len(chksum)))) } // DataKey returns datakey of the builder. @@ -340,14 +476,14 @@ func (b *Builder) shouldEncrypt() bool { } // compressData compresses the given data. -func (b *Builder) compressData(data []byte) ([]byte, error) { +func (b *Builder) compressData(dst, data []byte) ([]byte, error) { switch b.opt.Compression { case options.None: return data, nil case options.Snappy: - return snappy.Encode(nil, data), nil + return snappy.Encode(dst, data), nil case options.ZSTD: - return y.ZSTDCompress(nil, data, b.opt.ZSTDCompressionLevel) + return y.ZSTDCompress(dst, data, b.opt.ZSTDCompressionLevel) } return nil, errors.New("Unsupported compression type") } diff --git a/table/builder_test.go b/table/builder_test.go index 76296562e..5158a159a 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -32,30 +32,60 @@ import ( func TestTableIndex(t *testing.T) { rand.Seed(time.Now().Unix()) - keyPrefix := "key" - t.Run("single key", func(t *testing.T) { - opts := Options{Compression: options.ZSTD} - f := buildTestTable(t, keyPrefix, 1, opts) - tbl, err := OpenTable(f, opts) - require.NoError(t, err) - require.Len(t, tbl.blockIndex, 1) - }) + keysCount := 100000 + key := make([]byte, 32) + _, err := rand.Read(key) + require.NoError(t, err) + subTest := []struct { + name string + opts Options + }{ + { + name: "No encyption/compression", + opts: Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + TableSize: 30 << 20, + }, + }, + { + // Encryption mode. + name: "Only encryption", + opts: Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + TableSize: 30 << 20, + DataKey: &pb.DataKey{Data: key}, + }, + }, + { + // Compression mode. + name: "Only compression", + opts: Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + TableSize: 30 << 20, + Compression: options.ZSTD, + ZSTDCompressionLevel: 3, + }, + }, + { + // Compression mode and encryption. + name: "Compression and encryption", + opts: Options{ + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + TableSize: 30 << 20, + Compression: options.ZSTD, + ZSTDCompressionLevel: 3, + DataKey: &pb.DataKey{Data: key}, + }, + }, + } - t.Run("multiple keys", func(t *testing.T) { - opts := []Options{} - // Normal mode. - opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01}) - // Encryption mode. - key := make([]byte, 32) - _, err := rand.Read(key) - require.NoError(t, err) - opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, - DataKey: &pb.DataKey{Data: key}}) - // Compression mode. - opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, - Compression: options.ZSTD}) - keysCount := 10000 - for _, opt := range opts { + for _, tt := range subTest { + t.Run(tt.name, func(t *testing.T) { + opt := tt.opts builder := NewTableBuilder(opt) filename := fmt.Sprintf("%s%c%d.sst", os.TempDir(), os.PathSeparator, rand.Uint32()) f, err := y.OpenSyncedFile(filename, true) @@ -81,8 +111,9 @@ func TestTableIndex(t *testing.T) { tbl, err := OpenTable(f, opt) require.NoError(t, err, "unable to open table") + if opt.DataKey == nil { - // key id is zero if thre is no datakey. + // key id is zero if there is no datakey. require.Equal(t, tbl.KeyID(), uint64(0)) } @@ -93,8 +124,8 @@ func TestTableIndex(t *testing.T) { } f.Close() require.NoError(t, os.RemoveAll(filename)) - } - }) + }) + } } func TestInvalidCompression(t *testing.T) { @@ -125,18 +156,21 @@ func BenchmarkBuilder(b *testing.B) { keysCount := 1300000 // This number of entries consumes ~64MB of memory. + var keyList [][]byte + for i := 0; i < keysCount; i++ { + keyList = append(keyList, key(i)) + } bench := func(b *testing.B, opt *Options) { - // KeyCount * (keySize + ValSize) b.SetBytes(int64(keysCount) * (32 + 32)) + opt.BlockSize = 4 * 1024 + opt.BloomFalsePositive = 0.01 + opt.TableSize = 5 << 20 + b.ResetTimer() for i := 0; i < b.N; i++ { - opt.BlockSize = 4 * 1024 - opt.BloomFalsePositive = 0.01 builder := NewTableBuilder(*opt) - - for i := 0; i < keysCount; i++ { - builder.Add(key(i), vs, 0) + for j := 0; j < keysCount; j++ { + builder.Add(keyList[j], vs, 0) } - _ = builder.Finish() } } @@ -146,6 +180,13 @@ func BenchmarkBuilder(b *testing.B) { opt.Compression = options.None bench(b, &opt) }) + b.Run("encryption", func(b *testing.B) { + var opt Options + key := make([]byte, 32) + rand.Read(key) + opt.DataKey = &pb.DataKey{Data: key} + bench(b, &opt) + }) b.Run("zstd compression", func(b *testing.B) { var opt Options opt.Compression = options.ZSTD diff --git a/table/iterator.go b/table/iterator.go index a1e760c78..33a99a8f9 100644 --- a/table/iterator.go +++ b/table/iterator.go @@ -67,6 +67,7 @@ func (itr *blockIterator) setIdx(i int) { baseHeader.Decode(itr.data) itr.baseKey = itr.data[headerSize : headerSize+baseHeader.diff] } + var endOffset int // idx points to the last entry in the block. if itr.idx+1 == len(itr.entryOffsets) { diff --git a/table/table.go b/table/table.go index 25227be34..f96ae4359 100644 --- a/table/table.go +++ b/table/table.go @@ -49,6 +49,9 @@ const intSize = int(unsafe.Sizeof(int(0))) type Options struct { // Options for Opening/Building Table. + // Maximum size of the table. + TableSize uint64 + // ChkMode is the checksum verification mode for Table. ChkMode options.ChecksumVerificationMode @@ -427,7 +430,7 @@ func (t *Table) block(idx int) (*block, error) { // Checksum length greater than block size could happen if the table was compressed and // it was opened with an incorrect compression algorithm (or the data was corrupted). if blk.chkLen > len(blk.data) { - return nil, errors.New("invalid checksum length. Either the data is" + + return nil, errors.New("invalid checksum length. Either the data is " + "corrupted or the table options are incorrectly set") } diff --git a/table/table_test.go b/table/table_test.go index 27a4f1d16..46bd94d50 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -35,11 +35,6 @@ import ( "github.com/stretchr/testify/require" ) -const ( - KB = 1024 - MB = KB * 1024 -) - func key(prefix string, i int) string { return prefix + fmt.Sprintf("%04d", i) } @@ -356,7 +351,7 @@ func TestIterateBackAndForth(t *testing.T) { it.seekToFirst() k = it.Key() - require.EqualValues(t, key("key", 0), y.ParseKey(k)) + require.EqualValues(t, key("key", 0), string(y.ParseKey(k))) } func TestUniIterator(t *testing.T) { @@ -701,7 +696,8 @@ func TestTableBigValues(t *testing.T) { require.NoError(t, err, "unable to create file") n := 100 // Insert 100 keys. - opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01} + opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01, + TableSize: uint64(n) * 1 << 20} builder := NewTableBuilder(opts) for i := 0; i < n; i++ { key := y.KeyWithTs([]byte(key("", i)), 0) diff --git a/y/iterator.go b/y/iterator.go index 6d0f677c0..7c9b21194 100644 --- a/y/iterator.go +++ b/y/iterator.go @@ -64,11 +64,12 @@ func (v *ValueStruct) Decode(b []byte) { } // Encode expects a slice of length at least v.EncodedSize(). -func (v *ValueStruct) Encode(b []byte) { +func (v *ValueStruct) Encode(b []byte) uint32 { b[0] = v.Meta b[1] = v.UserMeta sz := binary.PutUvarint(b[2:], v.ExpiresAt) - copy(b[2+sz:], v.Value) + n := copy(b[2+sz:], v.Value) + return uint32(2 + sz + n) } // EncodeTo should be kept in sync with the Encode function above. The reason @@ -79,6 +80,7 @@ func (v *ValueStruct) EncodeTo(buf *bytes.Buffer) { buf.WriteByte(v.UserMeta) var enc [binary.MaxVarintLen64]byte sz := binary.PutUvarint(enc[:], v.ExpiresAt) + buf.Write(enc[:sz]) buf.Write(v.Value) }