Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce retained memory use #280

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions hrpc/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ func cellFromCellBlock(b []byte) (*pb.Cell, uint32, error) {

value := b[:valueLen]

// clone []byte values into new slice, so as not to retain any of
// the passed in memory.
c := make([]byte, len(key)+len(family)+len(qualifier)+len(value))
key, c = append(c[:0], key...), c[len(key):]
family, c = append(c[:0], family...), c[len(family):]
qualifier, c = append(c[:0], qualifier...), c[len(qualifier):]
value = append(c[:0], value...)

return &pb.Cell{
Row: key,
Family: family,
Expand Down
14 changes: 13 additions & 1 deletion hrpc/call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package hrpc

import (
"reflect"
"slices"
"strconv"
"testing"

Expand All @@ -19,7 +20,9 @@ func TestCellFromCellBlock(t *testing.T) {
102, 97, 0, 0, 1, 92, 13, 97, 5, 32, 4, 72, 101, 108, 108, 111, 32, 109, 121, 32, 110,
97, 109, 101, 32, 105, 115, 32, 68, 111, 103, 46}

cell, n, err := cellFromCellBlock(cellblock)
clonedCellblock := slices.Clone(cellblock)

cell, n, err := cellFromCellBlock(clonedCellblock)
if err != nil {
t.Error(err)
}
Expand All @@ -40,6 +43,15 @@ func TestCellFromCellBlock(t *testing.T) {
if !proto.Equal(expectedCell, cell) {
t.Errorf("expected cell %v, got cell %v", expectedCell, cell)
}
for i := range clonedCellblock {
// Modify the values of clonedCellblock, it should not affect
// the value of cell.
clonedCellblock[i] = 0
}
if !proto.Equal(expectedCell, cell) {
t.Errorf("After modifying input, cell should not have changed. "+
"expected %v, got cell %v", expectedCell, cell)
}

// test error cases
for i := range cellblock {
Expand Down
24 changes: 24 additions & 0 deletions hrpc/hrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1694,3 +1694,27 @@ func BenchmarkMutateToProtoWithNestedMaps(b *testing.B) {
}
}
}

func BenchmarkDeserializeCellBlocks(b *testing.B) {
b.Run("1", func(b *testing.B) {
b.ReportAllocs()
for range b.N {
res, _, err := deserializeCellBlocks(cellblock, 1)
if len(res) != 1 || err != nil {
b.Fatalf("len(res): %d err: %v", len(res), err)
}
}
})

cellblock100 := bytes.Repeat(cellblock, 100)
b.Run("100", func(b *testing.B) {
b.ReportAllocs()
for range b.N {
res, _, err := deserializeCellBlocks(cellblock100, 100)
if len(res) != 100 || err != nil {
b.Fatalf("len(res): %d err: %v", len(res), err)
}
}
})

}
12 changes: 5 additions & 7 deletions region/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,8 @@ const (
var bufferPool sync.Pool

func newBuffer(size int) []byte {
v := bufferPool.Get()
var b []byte
if v != nil {
b = v.([]byte)
}
return append(b[:0], make([]byte, size)...)
b, _ := bufferPool.Get().([]byte)
return append(b, make([]byte, size)...)
}

func freeBuffer(b []byte) {
Expand Down Expand Up @@ -477,7 +473,8 @@ func (c *client) receive(r io.Reader) (err error) {
}

size := binary.BigEndian.Uint32(sz[:])
b := make([]byte, size)
b := newBuffer(int(size))
defer freeBuffer(b)

_, err = io.ReadFull(r, b)
if err != nil {
Expand Down Expand Up @@ -546,6 +543,7 @@ func (c *client) receive(r io.Reader) (err error) {
b := b[size-cellsLen:]
if c.compressor != nil {
b, err = c.compressor.decompressCellblocks(b)
defer freeBuffer(b)
if err != nil {
err = RetryableError{fmt.Errorf("failed to decompress the response: %s", err)}
return
Expand Down
229 changes: 229 additions & 0 deletions region/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/tsuna/gohbase/compression/snappy"
"github.com/tsuna/gohbase/hrpc"
"github.com/tsuna/gohbase/pb"
"github.com/tsuna/gohbase/test"
Expand Down Expand Up @@ -1318,3 +1319,231 @@ func BenchmarkSendScanRequest(b *testing.B) {
}
})
}

func makeResponse(callID uint32, response proto.Message, cellblocks []byte,
compressor *compressor) []byte {
b := make([]byte, 4) // Reserve 4 bytes for size
header := &pb.ResponseHeader{
CallId: proto.Uint32(callID),
}
if len(cellblocks) > 0 {
if compressor != nil {
cellblocks = compressor.compressCellblocks(
net.Buffers{cellblocks}, uint32(len(cellblocks)))
}

header.CellBlockMeta = &pb.CellBlockMeta{Length: proto.Uint32(uint32(len(cellblocks)))}
}

b = protowire.AppendVarint(b, uint64(proto.Size(header)))
var err error
b, err = proto.MarshalOptions{}.MarshalAppend(b, header)
if err != nil {
panic(err)
}

b = protowire.AppendVarint(b, uint64(proto.Size(response)))
b, err = proto.MarshalOptions{}.MarshalAppend(b, response)
if err != nil {
panic(err)
}

b = append(b, cellblocks...)

binary.BigEndian.PutUint32(b[:4], uint32(len(b)-4))
return b
}

func cellblockLen(rowLen, familyLen, qualifierLen, valueLen int) int {
keyLength := 2 + rowLen + 1 + familyLen + qualifierLen + 8 + 1
keyValueLength := 4 + 4 + keyLength + valueLen
return 4 + keyValueLength
}

// Copied from hrpc/mutate.go
func appendCellblock(row []byte, family, qualifier string, value []byte, ts uint64, typ byte,
cbs []byte) []byte {
// cellblock layout:
//
// Header:
// 4 byte length of key + value
// 4 byte length of key
// 4 byte length of value
//
// Key:
// 2 byte length of row
// <row>
// 1 byte length of row family
// <family>
// <qualifier>
// 8 byte timestamp
// 1 byte type
//
// Value:
// <value>
keylength := 2 + len(row) + 1 + len(family) + len(qualifier) + 8 + 1
valuelength := len(value)

keyvaluelength := 4 + 4 + keylength + valuelength
i := len(cbs)
cbs = append(cbs, make([]byte,
cellblockLen(len(row), len(family), len(qualifier), len(value)))...)

// Header:
binary.BigEndian.PutUint32(cbs[i:], uint32(keyvaluelength))
i += 4
binary.BigEndian.PutUint32(cbs[i:], uint32(keylength))
i += 4
binary.BigEndian.PutUint32(cbs[i:], uint32(valuelength))
i += 4

// Key:
binary.BigEndian.PutUint16(cbs[i:], uint16(len(row)))
i += 2
i += copy(cbs[i:], row)
cbs[i] = byte(len(family))
i++
i += copy(cbs[i:], family)
i += copy(cbs[i:], qualifier)
binary.BigEndian.PutUint64(cbs[i:], ts)
i += 8
cbs[i] = typ
i++

// Value:
copy(cbs[i:], value)

return cbs
}

type fakeConn struct {
net.Conn
}

func (fakeConn) SetReadDeadline(t time.Time) error { return nil }

func BenchmarkReceive(b *testing.B) {
c := client{
conn: fakeConn{},
sent: make(map[uint32]hrpc.Call),
compressor: &compressor{snappy.New()},
}

reader := bytes.NewReader(nil)
put, err := hrpc.NewPutStr(context.Background(), "table", "key",
map[string]map[string][]byte{"cf": {"a": []byte("1")}})
if err != nil {
b.Fatal(err)
}
// Simple puts have an empty response
resp := makeResponse(1, &pb.MutateResponse{}, nil, nil)

b.Run("mutate", func(b *testing.B) {
b.ReportAllocs()
for range b.N {
// Set read buffer to the encoded response
reader.Reset(resp)
// Put the RPC in the sent map so that it can be found by
// receive
c.sent[1] = put
if err := c.receive(reader); err != nil {
b.Fatal(err)
}
// Consume the result so that this same request can be
// reused on the next iteration
<-put.ResultChan()
}
})

multi := newMulti(100)
multiResp := &pb.MultiResponse{
RegionActionResult: []*pb.RegionActionResult{{}},
}
calls := make([]hrpc.Call, 0, 100)
for i := range 100 {
put, err := hrpc.NewPutStr(context.Background(), "table", "key",
map[string]map[string][]byte{"cf": {"a": []byte("1")}})
if err != nil {
b.Fatal(err)
}
calls = append(calls, put)

multiResp.RegionActionResult[0].ResultOrException = append(
multiResp.RegionActionResult[0].ResultOrException,
&pb.ResultOrException{
Index: proto.Uint32(uint32(i + 1)),
Result: &pb.Result{},
},
)
}

multi.add(calls)
resp = makeResponse(1, multiResp, nil, nil)

b.Run("multiMutate100", func(b *testing.B) {
b.ReportAllocs()
for range b.N {
reader.Reset(resp)
if multi.len() != 100 {
b.Fatalf("unexpected len: %d", multi.len())
}
c.sent[1] = multi
if err := c.receive(reader); err != nil {
b.Fatal(err)
}
// Consume the results so that this same request can
// be reused on the next iteration
for _, c := range calls {
<-c.ResultChan()
}
// Need to do this on every iteration because
// returnResults on a multi resets the fields of multi
multi.add(calls)
}
})

scan, err := hrpc.NewScanStr(context.Background(), "table")
if err != nil {
b.Fatal(err)
}

cell := appendCellblock(
bytes.Repeat([]byte("0123456789"), 5), // 50-byte key
"f",
string(bytes.Repeat([]byte("9876543210"), 2)), // 20-byte qualifier
bytes.Repeat([]byte("abcdefghij"), 20), // 200-byte value
17356887651735688765,
byte(pb.CellType_PUT),
nil)
const twoMiB = 2 * 1024 * 1024
cellCount := twoMiB / len(cell)
cells := bytes.Repeat(cell, cellCount)

scanResponse := &pb.ScanResponse{
// TODO: Should this be 1 result with cellCount cells, or
// cellCount results each with 1 cell?
CellsPerResult: []uint32{uint32(cellCount)},
PartialFlagPerResult: []bool{false},
}
resp = makeResponse(1, scanResponse, cells, c.compressor)
b.Logf("Size of cells: %d Count of cells: %d Size of resp: %d",
len(cells), cellCount, len(resp))

b.Run("scanResult2MBwithCompression", func(b *testing.B) {
b.ReportAllocs()
for range b.N {
// Set read buffer to the encoded response
reader.Reset(resp)
// Put the RPC in the sent map so that it can be found by
// receive
c.sent[1] = scan
if err := c.receive(reader); err != nil {
b.Fatal(err)
}
// Consume the result so that this same request can be
// reused on the next iteration
<-scan.ResultChan()
}

})
}
2 changes: 1 addition & 1 deletion region/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func readUint32(b []byte) (uint32, []byte, error) {
func (c *compressor) decompressCellblocks(b []byte) ([]byte, error) {
var (
err error
out []byte
out = newBuffer(0) // gets freed in receive
compressedChunk []byte
compressedChunkLen uint32
uncompressedBlockLen uint32
Expand Down