Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Support required rows rows for arrow decode format. #12613

Merged
merged 55 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
a6405ca
add_encode_type_flag
wshwsh12 Oct 8, 2019
1c2bcd2
tidy mod
wshwsh12 Oct 8, 2019
1138b8e
Merge remote-tracking branch 'upstream/master' into add_encode_type_flag
wshwsh12 Oct 8, 2019
10d6dcf
fix
wshwsh12 Oct 8, 2019
d0b5799
Merge branch 'master' into add_encode_type_flag
wshwsh12 Oct 8, 2019
13d22ad
temp
wshwsh12 Oct 8, 2019
95e3d09
Merge branch 'master' into add_encode_type_flag
sre-bot Oct 9, 2019
6ebd96c
Merge branch 'master' into add_encode_type_flag
wshwsh12 Oct 9, 2019
32c7bfe
requested_rows
wshwsh12 Oct 9, 2019
b3fd5ab
Merge remote-tracking branch 'origin/add_encode_type_flag' into reque…
wshwsh12 Oct 9, 2019
5f754d6
Merge remote-tracking branch 'upstream/master' into requested_rows
wshwsh12 Oct 9, 2019
ec06f30
fix
wshwsh12 Oct 10, 2019
3e014b9
Merge remote-tracking branch 'upstream/master' into requested_rows
wshwsh12 Oct 10, 2019
b0f5105
adjust code
wshwsh12 Oct 10, 2019
23149ca
remove useless check
wshwsh12 Oct 11, 2019
c5ac8b2
temp
wshwsh12 Oct 12, 2019
0743aa2
fix
wshwsh12 Oct 14, 2019
cbe252f
Merge remote-tracking branch 'upstream/master' into requested_rows
wshwsh12 Oct 14, 2019
1cf0853
fix
wshwsh12 Oct 14, 2019
d72d5af
fix
wshwsh12 Oct 14, 2019
97358b9
Merge remote-tracking branch 'upstream/master' into requested_rows
wshwsh12 Oct 14, 2019
9a86bd6
address comments
wshwsh12 Oct 18, 2019
d4b77aa
Merge remote-tracking branch 'upstream/master' into requested_rows
wshwsh12 Oct 18, 2019
38cbf7c
address comments
wshwsh12 Oct 22, 2019
ef9b412
Merge remote-tracking branch 'upstream/master' into requested_rows
wshwsh12 Oct 22, 2019
6161ccf
comments
wshwsh12 Oct 22, 2019
75975b2
s/arrowdecoder /decoder
wshwsh12 Oct 22, 2019
7574eb2
Merge remote-tracking branch 'upstream/master' into requested_rows
wshwsh12 Oct 22, 2019
e606377
address comments
wshwsh12 Oct 22, 2019
df40cd6
address comments
wshwsh12 Oct 23, 2019
6931b70
Merge remote-tracking branch 'upstream/master' into requested_rows
wshwsh12 Oct 23, 2019
9cc34e7
adjust
wshwsh12 Oct 23, 2019
a359a38
fix typo
wshwsh12 Oct 28, 2019
5e01b66
fix comments
wshwsh12 Oct 28, 2019
18ff275
Merge remote-tracking branch 'upstream/master' into requested_rows
wshwsh12 Oct 28, 2019
b7d94dc
magic 0.8
wshwsh12 Oct 28, 2019
99bb831
Merge remote-tracking branch 'upstream/master' into requested_rows
wshwsh12 Oct 28, 2019
2788a89
fix ci
wshwsh12 Oct 29, 2019
650c96a
add comment
wshwsh12 Oct 29, 2019
a1e92e7
add benchmark
wshwsh12 Oct 29, 2019
bc610a1
address comments
wshwsh12 Oct 30, 2019
1f782b1
Merge branch 'master' into requested_rows
wshwsh12 Oct 30, 2019
0c4e14f
Merge branch 'master' into requested_rows
wshwsh12 Nov 1, 2019
1c607bd
Merge branch 'master' into requested_rows
wshwsh12 Nov 1, 2019
cc243d9
add big_response benchmark
wshwsh12 Nov 4, 2019
5f2aa80
temp
wshwsh12 Nov 4, 2019
774d633
benchmark
wshwsh12 Nov 4, 2019
5223d23
Merge remote-tracking branch 'upstream/master' into requested_rows
wshwsh12 Nov 4, 2019
08eab3f
adjust
wshwsh12 Nov 4, 2019
3c7b3aa
fix ci
wshwsh12 Nov 5, 2019
bc37593
benchmark
wshwsh12 Nov 5, 2019
f3fadc6
teardownsuite
wshwsh12 Nov 5, 2019
9a75aa7
Merge remote-tracking branch 'upstream/master' into requested_rows
wshwsh12 Nov 5, 2019
2d3ccfc
set chunk size
wshwsh12 Nov 5, 2019
26b5fda
Merge branch 'master' into requested_rows
sre-bot Nov 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 107 additions & 113 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -127,6 +127,7 @@ func (s *testSuite) TestSelectMemTracker(c *C) {
}

func (s *testSuite) TestSelectNormalChunkSize(c *C) {
s.sctx.GetSessionVars().EnableArrow = false
response, colTypes := s.createSelectNormal(100, 1000000, c, nil)
response.Fetch(context.TODO())
s.testChunkSize(response, colTypes, c)
Expand Down Expand Up @@ -288,6 +289,7 @@ func (s *testSuite) testChunkSize(response SelectResult, colTypes []*types.Field
}

func (s *testSuite) TestAnalyze(c *C) {
s.sctx.GetSessionVars().EnableArrow = false
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetAnalyzeRequest(&tipb.AnalyzeReq{}).
SetKeepOrder(true).
Expand All @@ -313,6 +315,7 @@ func (s *testSuite) TestAnalyze(c *C) {
}

func (s *testSuite) TestChecksum(c *C) {
s.sctx.GetSessionVars().EnableArrow = false
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetChecksumRequest(&tipb.ChecksumRequest{}).
Build()
Expand Down Expand Up @@ -342,6 +345,7 @@ type mockResponse struct {
count int
total int
batch int
ctx sessionctx.Context
sync.Mutex
}

Expand All @@ -365,20 +369,50 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) {
numRows := mathutil.Min(resp.batch, resp.total-resp.count)
resp.count += numRows

datum := types.NewIntDatum(1)
bytes := make([]byte, 0, 100)
bytes, _ = codec.EncodeValue(nil, bytes, datum, datum, datum, datum)
chunks := make([]tipb.Chunk, numRows)
for i := range chunks {
chkData := make([]byte, len(bytes))
copy(chkData, bytes)
chunks[i] = tipb.Chunk{RowsData: chkData}
var chunks []tipb.Chunk
if !enableTypeArrow(resp.ctx) {
datum := types.NewIntDatum(1)
bytes := make([]byte, 0, 100)
bytes, _ = codec.EncodeValue(nil, bytes, datum, datum, datum, datum)
chunks = make([]tipb.Chunk, numRows)
for i := range chunks {
chkData := make([]byte, len(bytes))
copy(chkData, bytes)
chunks[i] = tipb.Chunk{RowsData: chkData}
}
} else {
chunks = make([]tipb.Chunk, 0)
for numRows > 0 {
rows := mathutil.Min(numRows, 1024)
numRows -= rows

colTypes := make([]*types.FieldType, 4)
for i := 0; i < 4; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
chk := chunk.New(colTypes, numRows, numRows)

for rowOrdinal := 0; rowOrdinal < rows; rowOrdinal++ {
for colOrdinal := 0; colOrdinal < 4; colOrdinal++ {
chk.AppendInt64(colOrdinal, 123)
}
}

codec := chunk.NewCodec(colTypes)
buffer := codec.Encode(chk)
chunks = append(chunks, tipb.Chunk{RowsData: buffer})
}
}

respPB := &tipb.SelectResponse{
Chunks: chunks,
OutputCounts: []int64{1},
}
if enableTypeArrow(resp.ctx) {
respPB.EncodeType = tipb.EncodeType_TypeArrow
} else {
respPB.EncodeType = tipb.EncodeType_TypeDefault
}
respBytes, err := respPB.Marshal()
if err != nil {
panic(err)
Expand Down Expand Up @@ -407,125 +441,85 @@ func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) }
// RespTime implements kv.ResultSubset interface.
func (r *mockResultSubset) RespTime() time.Duration { return 0 }

func populateBuffer() []byte {
numCols := 4
numRows := 1024
buffer := make([]byte, 0, 1024)
sc := &stmtctx.StatementContext{TimeZone: time.Local}

for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ {
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ {
buffer, _ = codec.EncodeValue(sc, buffer, types.NewIntDatum(123))
}
}

return buffer
}

func mockReadRowsData(buffer []byte, colTypes []*types.FieldType, chk *chunk.Chunk) (err error) {
chk.Reset()
numCols := 4
numRows := 1024

decoder := codec.NewDecoder(chk, time.Local)
for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ {
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ {
buffer, err = decoder.DecodeOne(buffer, colOrdinal, colTypes[colOrdinal])
if err != nil {
return err
}
}
}

return nil
}

func BenchmarkReadRowsData(b *testing.B) {
numCols := 4
numRows := 1024

colTypes := make([]*types.FieldType, numCols)
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
chk := chunk.New(colTypes, numRows, numRows)

buffer := populateBuffer()

b.ResetTimer()
for i := 0; i < b.N; i++ {
mockReadRowsData(buffer, colTypes, chk)
}
}

func BenchmarkDecodeToChunk(b *testing.B) {
numCols := 4
numRows := 1024
func createSelectNormal(batch, totalRows int, ctx sessionctx.Context) (*selectResult, []*types.FieldType) {
request, _ := (&RequestBuilder{}).SetKeyRanges(nil).
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetMemTracker(memory.NewTracker(stringutil.StringerStr("testSuite.createSelectNormal"),
ctx.GetSessionVars().MemQuotaDistSQL)).
Build()

colTypes := make([]*types.FieldType, numCols)
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
/// 4 int64 types.
colTypes := []*types.FieldType{
{
Tp: mysql.TypeLonglong,
Flen: mysql.MaxIntWidth,
Decimal: 0,
Flag: mysql.BinaryFlag,
Charset: charset.CharsetBin,
Collate: charset.CollationBin,
},
}
chk := chunk.New(colTypes, numRows, numRows)
colTypes = append(colTypes, colTypes[0])
colTypes = append(colTypes, colTypes[0])
colTypes = append(colTypes, colTypes[0])

for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ {
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ {
chk.AppendInt64(colOrdinal, 123)
}
}
// Test Next.
var response SelectResult
response, _ = Select(context.TODO(), ctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))

codec := chunk.NewCodec(colTypes)
buffer := codec.Encode(chk)
result, _ := response.(*selectResult)
resp, _ := result.resp.(*mockResponse)
resp.total = totalRows
resp.batch = batch

b.ResetTimer()
for i := 0; i < b.N; i++ {
codec.DecodeToChunk(buffer, chk)
}
return result, colTypes
}

func BenchmarkReadRowsDataWithNewChunk(b *testing.B) {
numCols := 4
numRows := 1024

colTypes := make([]*types.FieldType, numCols)
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
buffer := populateBuffer()

b.ResetTimer()
func BenchmarkSelectResponseChunk_BigResponse(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
chk := chunk.New(colTypes, numRows, numRows)
s := &testSuite{}
s.SetUpSuite(nil)
selectResult, colTypes := createSelectNormal(4000, 20000, s.sctx)
selectResult.Fetch(context.TODO())
chk := chunk.NewChunkWithCapacity(colTypes, 1024)
b.StartTimer()
mockReadRowsData(buffer, colTypes, chk)
}
}

func BenchmarkDecodeToChunkWithNewChunk(b *testing.B) {
numCols := 4
numRows := 1024

colTypes := make([]*types.FieldType, numCols)
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
chk := chunk.New(colTypes, numRows, numRows)

for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ {
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ {
chk.AppendInt64(colOrdinal, 123)
for true {
err := selectResult.Next(context.TODO(), chk)
if err != nil {
panic(err)
}
if chk.NumRows() == 0 {
break
}
chk.Reset()
}
s.TearDownSuite(nil)
}
}

codec := chunk.NewCodec(colTypes)
buffer := codec.Encode(chk)

b.ResetTimer()
func BenchmarkSelectResponseChunk_SmallResponse(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
chk := chunk.New(colTypes, numRows, numRows)
s := &testSuite{}
s.SetUpSuite(nil)
selectResult, colTypes := createSelectNormal(32, 3200, s.sctx)
selectResult.Fetch(context.TODO())
chk := chunk.NewChunkWithCapacity(colTypes, 1024)
b.StartTimer()
codec.DecodeToChunk(buffer, chk)
for true {
err := selectResult.Next(context.TODO(), chk)
if err != nil {
panic(err)
}
if chk.NumRows() == 0 {
break
}
chk.Reset()
}
s.TearDownSuite(nil)
}
}
2 changes: 2 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (s *testSuite) SetUpSuite(c *C) {
ctx.Store = &mock.Store{
Client: &mock.Client{
MockResponse: &mockResponse{
ctx: ctx,
batch: 1,
total: 2,
},
Expand All @@ -77,6 +78,7 @@ func (s *testSuite) SetUpTest(c *C) {
store := ctx.Store.(*mock.Store)
store.Client = &mock.Client{
MockResponse: &mockResponse{
ctx: ctx,
batch: 1,
total: 2,
},
Expand Down
44 changes: 37 additions & 7 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ type selectResult struct {
fieldTypes []*types.FieldType
ctx sessionctx.Context

selectResp *tipb.SelectResponse
selectRespSize int // record the selectResp.Size() when it is initialized.
respChkIdx int
selectResp *tipb.SelectResponse
selectRespSize int // record the selectResp.Size() when it is initialized.
respChkIdx int
respChunkDecoder *chunk.Decoder

feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
Expand Down Expand Up @@ -183,10 +184,39 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er
}

func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error {
rowBatchData := r.selectResp.Chunks[r.respChkIdx].RowsData
codec := chunk.NewCodec(r.fieldTypes)
_ = codec.DecodeToChunk(rowBatchData, chk)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change DecodeToChunk to unexported after this PR?

r.respChkIdx++
if r.respChunkDecoder == nil {
r.respChunkDecoder = chunk.NewDecoder(
chunk.NewChunkWithCapacity(r.fieldTypes, 0),
r.fieldTypes,
)
}

for !chk.IsFull() {
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
if r.respChkIdx == len(r.selectResp.Chunks) {
err := r.getSelectResp()
if err != nil || r.selectResp == nil {
return err
}
}

if r.respChunkDecoder.IsFinished() {
r.respChunkDecoder.Reset(r.selectResp.Chunks[r.respChkIdx].RowsData)
}
// If the next chunk size is greater than required rows * 0.8, reuse the memory of the next chunk and return
// immediately. Otherwise, splice the data to one chunk and wait the next chunk.
if r.respChunkDecoder.RemainedRows() > int(float64(chk.RequiredRows())*0.8) {
if chk.NumRows() > 0 {
return nil
}
r.respChunkDecoder.ReuseIntermChk(chk)
r.respChkIdx++
return nil
}
r.respChunkDecoder.Decode(chk)
if r.respChunkDecoder.IsFinished() {
r.respChkIdx++
}
}
return nil
}

Expand Down
Loading