diff --git a/hrpc/call.go b/hrpc/call.go index dce27561..5644a17f 100644 --- a/hrpc/call.go +++ b/hrpc/call.go @@ -238,6 +238,14 @@ func cellFromCellBlock(b []byte) (*pb.Cell, uint32, error) { value := b[:valueLen] + // clone []byte values into new slice, so as not to retain any of + // the passed in memory. + c := make([]byte, len(key)+len(family)+len(qualifier)+len(value)) + key, c = append(c[:0], key...), c[len(key):] + family, c = append(c[:0], family...), c[len(family):] + qualifier, c = append(c[:0], qualifier...), c[len(qualifier):] + value = append(c[:0], value...) + return &pb.Cell{ Row: key, Family: family, diff --git a/hrpc/call_test.go b/hrpc/call_test.go index ed710e4f..63542fce 100644 --- a/hrpc/call_test.go +++ b/hrpc/call_test.go @@ -7,6 +7,7 @@ package hrpc import ( "reflect" + "slices" "strconv" "testing" @@ -19,7 +20,9 @@ func TestCellFromCellBlock(t *testing.T) { 102, 97, 0, 0, 1, 92, 13, 97, 5, 32, 4, 72, 101, 108, 108, 111, 32, 109, 121, 32, 110, 97, 109, 101, 32, 105, 115, 32, 68, 111, 103, 46} - cell, n, err := cellFromCellBlock(cellblock) + clonedCellblock := slices.Clone(cellblock) + + cell, n, err := cellFromCellBlock(clonedCellblock) if err != nil { t.Error(err) } @@ -40,6 +43,15 @@ func TestCellFromCellBlock(t *testing.T) { if !proto.Equal(expectedCell, cell) { t.Errorf("expected cell %v, got cell %v", expectedCell, cell) } + for i := range clonedCellblock { + // Modify the values of clonedCellblock, it should not affect + // the value of cell. + clonedCellblock[i] = 0 + } + if !proto.Equal(expectedCell, cell) { + t.Errorf("After modifying input, cell should not have changed. "+ + "expected %v, got cell %v", expectedCell, cell) + } // test error cases for i := range cellblock { diff --git a/hrpc/hrpc_test.go b/hrpc/hrpc_test.go index 79a10fc2..c6d5da2d 100644 --- a/hrpc/hrpc_test.go +++ b/hrpc/hrpc_test.go @@ -1694,3 +1694,27 @@ func BenchmarkMutateToProtoWithNestedMaps(b *testing.B) { } } } + +func BenchmarkDeserializeCellBlocks(b *testing.B) { + b.Run("1", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + res, _, err := deserializeCellBlocks(cellblock, 1) + if len(res) != 1 || err != nil { + b.Fatalf("len(res): %d err: %v", len(res), err) + } + } + }) + + cellblock100 := bytes.Repeat(cellblock, 100) + b.Run("100", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + res, _, err := deserializeCellBlocks(cellblock100, 100) + if len(res) != 100 || err != nil { + b.Fatalf("len(res): %d err: %v", len(res), err) + } + } + }) + +} diff --git a/region/client.go b/region/client.go index 0c334978..ed4e4ac9 100644 --- a/region/client.go +++ b/region/client.go @@ -103,12 +103,8 @@ const ( var bufferPool sync.Pool func newBuffer(size int) []byte { - v := bufferPool.Get() - var b []byte - if v != nil { - b = v.([]byte) - } - return append(b[:0], make([]byte, size)...) + b, _ := bufferPool.Get().([]byte) + return append(b, make([]byte, size)...) } func freeBuffer(b []byte) { @@ -477,7 +473,8 @@ func (c *client) receive(r io.Reader) (err error) { } size := binary.BigEndian.Uint32(sz[:]) - b := make([]byte, size) + b := newBuffer(int(size)) + defer freeBuffer(b) _, err = io.ReadFull(r, b) if err != nil { @@ -546,6 +543,7 @@ func (c *client) receive(r io.Reader) (err error) { b := b[size-cellsLen:] if c.compressor != nil { b, err = c.compressor.decompressCellblocks(b) + defer freeBuffer(b) if err != nil { err = RetryableError{fmt.Errorf("failed to decompress the response: %s", err)} return diff --git a/region/client_test.go b/region/client_test.go index a66ef773..ce64369f 100644 --- a/region/client_test.go +++ b/region/client_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/tsuna/gohbase/compression/snappy" "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/pb" "github.com/tsuna/gohbase/test" @@ -1318,3 +1319,231 @@ func BenchmarkSendScanRequest(b *testing.B) { } }) } + +func makeResponse(callID uint32, response proto.Message, cellblocks []byte, + compressor *compressor) []byte { + b := make([]byte, 4) // Reserve 4 bytes for size + header := &pb.ResponseHeader{ + CallId: proto.Uint32(callID), + } + if len(cellblocks) > 0 { + if compressor != nil { + cellblocks = compressor.compressCellblocks( + net.Buffers{cellblocks}, uint32(len(cellblocks))) + } + + header.CellBlockMeta = &pb.CellBlockMeta{Length: proto.Uint32(uint32(len(cellblocks)))} + } + + b = protowire.AppendVarint(b, uint64(proto.Size(header))) + var err error + b, err = proto.MarshalOptions{}.MarshalAppend(b, header) + if err != nil { + panic(err) + } + + b = protowire.AppendVarint(b, uint64(proto.Size(response))) + b, err = proto.MarshalOptions{}.MarshalAppend(b, response) + if err != nil { + panic(err) + } + + b = append(b, cellblocks...) + + binary.BigEndian.PutUint32(b[:4], uint32(len(b)-4)) + return b +} + +func cellblockLen(rowLen, familyLen, qualifierLen, valueLen int) int { + keyLength := 2 + rowLen + 1 + familyLen + qualifierLen + 8 + 1 + keyValueLength := 4 + 4 + keyLength + valueLen + return 4 + keyValueLength +} + +// Copied from hrpc/mutate.go +func appendCellblock(row []byte, family, qualifier string, value []byte, ts uint64, typ byte, + cbs []byte) []byte { + // cellblock layout: + // + // Header: + // 4 byte length of key + value + // 4 byte length of key + // 4 byte length of value + // + // Key: + // 2 byte length of row + // + // 1 byte length of row family + // + // + // 8 byte timestamp + // 1 byte type + // + // Value: + // + keylength := 2 + len(row) + 1 + len(family) + len(qualifier) + 8 + 1 + valuelength := len(value) + + keyvaluelength := 4 + 4 + keylength + valuelength + i := len(cbs) + cbs = append(cbs, make([]byte, + cellblockLen(len(row), len(family), len(qualifier), len(value)))...) + + // Header: + binary.BigEndian.PutUint32(cbs[i:], uint32(keyvaluelength)) + i += 4 + binary.BigEndian.PutUint32(cbs[i:], uint32(keylength)) + i += 4 + binary.BigEndian.PutUint32(cbs[i:], uint32(valuelength)) + i += 4 + + // Key: + binary.BigEndian.PutUint16(cbs[i:], uint16(len(row))) + i += 2 + i += copy(cbs[i:], row) + cbs[i] = byte(len(family)) + i++ + i += copy(cbs[i:], family) + i += copy(cbs[i:], qualifier) + binary.BigEndian.PutUint64(cbs[i:], ts) + i += 8 + cbs[i] = typ + i++ + + // Value: + copy(cbs[i:], value) + + return cbs +} + +type fakeConn struct { + net.Conn +} + +func (fakeConn) SetReadDeadline(t time.Time) error { return nil } + +func BenchmarkReceive(b *testing.B) { + c := client{ + conn: fakeConn{}, + sent: make(map[uint32]hrpc.Call), + compressor: &compressor{snappy.New()}, + } + + reader := bytes.NewReader(nil) + put, err := hrpc.NewPutStr(context.Background(), "table", "key", + map[string]map[string][]byte{"cf": {"a": []byte("1")}}) + if err != nil { + b.Fatal(err) + } + // Simple puts have an empty response + resp := makeResponse(1, &pb.MutateResponse{}, nil, nil) + + b.Run("mutate", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + // Set read buffer to the encoded response + reader.Reset(resp) + // Put the RPC in the sent map so that it can be found by + // receive + c.sent[1] = put + if err := c.receive(reader); err != nil { + b.Fatal(err) + } + // Consume the result so that this same request can be + // reused on the next iteration + <-put.ResultChan() + } + }) + + multi := newMulti(100) + multiResp := &pb.MultiResponse{ + RegionActionResult: []*pb.RegionActionResult{{}}, + } + calls := make([]hrpc.Call, 0, 100) + for i := range 100 { + put, err := hrpc.NewPutStr(context.Background(), "table", "key", + map[string]map[string][]byte{"cf": {"a": []byte("1")}}) + if err != nil { + b.Fatal(err) + } + calls = append(calls, put) + + multiResp.RegionActionResult[0].ResultOrException = append( + multiResp.RegionActionResult[0].ResultOrException, + &pb.ResultOrException{ + Index: proto.Uint32(uint32(i + 1)), + Result: &pb.Result{}, + }, + ) + } + + multi.add(calls) + resp = makeResponse(1, multiResp, nil, nil) + + b.Run("multiMutate100", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + reader.Reset(resp) + if multi.len() != 100 { + b.Fatalf("unexpected len: %d", multi.len()) + } + c.sent[1] = multi + if err := c.receive(reader); err != nil { + b.Fatal(err) + } + // Consume the results so that this same request can + // be reused on the next iteration + for _, c := range calls { + <-c.ResultChan() + } + // Need to do this on every iteration because + // returnResults on a multi resets the fields of multi + multi.add(calls) + } + }) + + scan, err := hrpc.NewScanStr(context.Background(), "table") + if err != nil { + b.Fatal(err) + } + + cell := appendCellblock( + bytes.Repeat([]byte("0123456789"), 5), // 50-byte key + "f", + string(bytes.Repeat([]byte("9876543210"), 2)), // 20-byte qualifier + bytes.Repeat([]byte("abcdefghij"), 20), // 200-byte value + 17356887651735688765, + byte(pb.CellType_PUT), + nil) + const twoMiB = 2 * 1024 * 1024 + cellCount := twoMiB / len(cell) + cells := bytes.Repeat(cell, cellCount) + + scanResponse := &pb.ScanResponse{ + // TODO: Should this be 1 result with cellCount cells, or + // cellCount results each with 1 cell? + CellsPerResult: []uint32{uint32(cellCount)}, + PartialFlagPerResult: []bool{false}, + } + resp = makeResponse(1, scanResponse, cells, c.compressor) + b.Logf("Size of cells: %d Count of cells: %d Size of resp: %d", + len(cells), cellCount, len(resp)) + + b.Run("scanResult2MBwithCompression", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + // Set read buffer to the encoded response + reader.Reset(resp) + // Put the RPC in the sent map so that it can be found by + // receive + c.sent[1] = scan + if err := c.receive(reader); err != nil { + b.Fatal(err) + } + // Consume the result so that this same request can be + // reused on the next iteration + <-scan.ResultChan() + } + + }) +} diff --git a/region/compressor.go b/region/compressor.go index a71b9ffa..e22e0b44 100644 --- a/region/compressor.go +++ b/region/compressor.go @@ -92,7 +92,7 @@ func readUint32(b []byte) (uint32, []byte, error) { func (c *compressor) decompressCellblocks(b []byte) ([]byte, error) { var ( err error - out []byte + out = newBuffer(0) // gets freed in receive compressedChunk []byte compressedChunkLen uint32 uncompressedBlockLen uint32