Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: encode the returned data using the Chunk format in mocktikv #12023

Merged
merged 46 commits into from
Sep 28, 2019
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
4470f2c
change_chunk
wshwsh12 Sep 3, 2019
4f3c090
add performance test code
wshwsh12 Sep 4, 2019
b57c1c7
add benchmark with new chunk
wshwsh12 Sep 5, 2019
6d34738
fix ci
wshwsh12 Sep 6, 2019
0fddb75
remove some ad-hoc code
wshwsh12 Sep 6, 2019
8547dea
reuse mem
wshwsh12 Sep 9, 2019
818f590
add TODO
wshwsh12 Sep 10, 2019
d841cc6
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 12, 2019
17389b3
check returned data is default/arrow format
wshwsh12 Sep 16, 2019
3c0505e
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 16, 2019
c42e3ba
remove useless var
wshwsh12 Sep 16, 2019
ae73a0c
remove useless var
wshwsh12 Sep 16, 2019
41d2abb
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 16, 2019
464b816
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 17, 2019
6808fc2
address comments
wshwsh12 Sep 18, 2019
4ea9b85
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 18, 2019
32e4583
address comment
wshwsh12 Sep 18, 2019
2758e02
add a metric
wshwsh12 Sep 18, 2019
b96379b
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 18, 2019
8eb59bd
remove decodeType
wshwsh12 Sep 18, 2019
64a88e0
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 18, 2019
324ad76
fix
wshwsh12 Sep 19, 2019
d6d5362
Merge branch 'master' into change_chunk
wshwsh12 Sep 19, 2019
2892c8a
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 22, 2019
64f2bd3
revert mem reuse
wshwsh12 Sep 23, 2019
4b0bc99
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 23, 2019
56e85b7
address comments
wshwsh12 Sep 24, 2019
2deb7c5
address comment
wshwsh12 Sep 24, 2019
be50349
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 24, 2019
9b2d50e
adjust config
wshwsh12 Sep 24, 2019
81a5c13
adjust metrics
wshwsh12 Sep 24, 2019
a181642
adjust metric
wshwsh12 Sep 25, 2019
08a512d
adjust metric
wshwsh12 Sep 25, 2019
59d8619
remove metrics
wshwsh12 Sep 25, 2019
f057d09
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 25, 2019
0f9662b
remove useless checks
wshwsh12 Sep 25, 2019
aa8dc9d
adjust interfacde Fetch and add session vars
wshwsh12 Sep 25, 2019
a86f1c6
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 25, 2019
feb2cd0
change comment
wshwsh12 Sep 26, 2019
07e9804
fix comment
wshwsh12 Sep 26, 2019
a32153d
add comment
wshwsh12 Sep 26, 2019
96a7951
address comments
wshwsh12 Sep 26, 2019
392ec3c
Merge remote-tracking branch 'upstream/master' into change_chunk
wshwsh12 Sep 26, 2019
388fed5
Merge branch 'master' into change_chunk
sre-bot Sep 27, 2019
7f67e84
change default value
wshwsh12 Sep 27, 2019
26b704b
Merge branch 'master' into change_chunk
zz-jason Sep 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ type TiKVClient struct {
MaxBatchWaitTime time.Duration `toml:"max-batch-wait-time" json:"max-batch-wait-time"`
// BatchWaitSize is the max wait size for batch.
BatchWaitSize uint `toml:"batch-wait-size" json:"batch-wait-size"`
// EnableArrow indicate the data encode in arrow format.
EnableArrow bool `toml:"enable-arrow" json:"enable-arrow"`
}

// Binlog is the config for binlog.
Expand Down Expand Up @@ -411,6 +413,8 @@ var defaultConf = Config{
OverloadThreshold: 200,
MaxBatchWaitTime: 0,
BatchWaitSize: 8,

EnableArrow: true,
},
Binlog: Binlog{
WriteTimeout: "15s",
Expand Down
3 changes: 3 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ max-batch-wait-time = 0
# Batch wait size, to avoid waiting too long.
batch-wait-size = 8

# Enable chunk encoded data.
enable-arrow = true

[txn-local-latches]
# Enable local latches for transactions. Enable it when
# there are lots of conflicts between transactions.
Expand Down
7 changes: 4 additions & 3 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func constructLimitPB(count uint64) *tipb.Executor {
return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec}
}

func buildDescTableScanDAG(startTS uint64, tbl table.PhysicalTable, columns []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) {
func buildDescTableScanDAG(ctx sessionctx.Context, startTS uint64, tbl table.PhysicalTable, columns []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = startTS
_, timeZoneOffset := time.Now().In(time.UTC).Zone()
Expand All @@ -234,6 +234,7 @@ func buildDescTableScanDAG(startTS uint64, tbl table.PhysicalTable, columns []*m
tblScanExec := constructDescTableScanPB(tbl.GetPhysicalID(), pbColumnInfos)
dagReq.Executors = append(dagReq.Executors, tblScanExec)
dagReq.Executors = append(dagReq.Executors, constructLimitPB(limit))
distsql.SetEncodeType(ctx, dagReq)
return dagReq, nil
}

Expand All @@ -247,7 +248,8 @@ func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType {

// buildDescTableScan builds a desc table scan upon tblInfo.
func (d *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl table.PhysicalTable, columns []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) {
dagPB, err := buildDescTableScanDAG(startTS, tbl, columns, limit)
sctx := newContext(d.store)
dagPB, err := buildDescTableScanDAG(sctx, startTS, tbl, columns, limit)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -266,7 +268,6 @@ func (d *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl tab
return nil, errors.Trace(err)
}

sctx := newContext(d.store)
result, err := distsql.Select(ctx, sctx, kvReq, getColumnsTypes(columns), statistics.NewQueryFeedback(0, nil, 0, false))
if err != nil {
return nil, errors.Trace(err)
Expand Down
54 changes: 42 additions & 12 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tipb/go-tipb"
)

// XAPI error codes.
Expand Down Expand Up @@ -74,6 +75,10 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
feedback: fb,
}, nil
}
encodetype := tipb.EncodeType_TypeDefault
if enableTypeArrow(sctx) {
encodetype = tipb.EncodeType_TypeArrow
}
return &selectResult{
label: "dag",
resp: resp,
Expand All @@ -85,6 +90,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
feedback: fb,
sqlType: label,
memTracker: kvReq.MemTracker,
encodeType: encodetype,
}, nil
}

Expand Down Expand Up @@ -115,12 +121,13 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.
label = metrics.LblInternal
}
result := &selectResult{
label: "analyze",
resp: resp,
results: make(chan resultWithErr, kvReq.Concurrency),
closed: make(chan struct{}),
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: label,
label: "analyze",
resp: resp,
results: make(chan resultWithErr, kvReq.Concurrency),
closed: make(chan struct{}),
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: label,
encodeType: tipb.EncodeType_TypeDefault,
}
return result, nil
}
Expand All @@ -132,12 +139,35 @@ func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv
return nil, errors.New("client returns nil response")
}
result := &selectResult{
label: "checksum",
resp: resp,
results: make(chan resultWithErr, kvReq.Concurrency),
closed: make(chan struct{}),
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: metrics.LblGeneral,
label: "checksum",
resp: resp,
results: make(chan resultWithErr, kvReq.Concurrency),
closed: make(chan struct{}),
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: metrics.LblGeneral,
encodeType: tipb.EncodeType_TypeDefault,
}
return result, nil
}

// SetEncodeType sets the encoding method for the DAGRequest. The supported encoding
// methods are:
// 1. TypeArrow: the result is encoded using the Chunk format, refer util/chunk/chunk.go
// 2. TypeDefault: the result is encoded row by row
func SetEncodeType(ctx sessionctx.Context, dagReq *tipb.DAGRequest) {
if enableTypeArrow(ctx) {
dagReq.EncodeType = tipb.EncodeType_TypeArrow
} else {
dagReq.EncodeType = tipb.EncodeType_TypeDefault
}
}

func enableTypeArrow(ctx sessionctx.Context) bool {
if !ctx.GetSessionVars().EnableArrow {
return false
}
if ctx.GetSessionVars().EnableStreaming {
return false
}
return true
}
47 changes: 47 additions & 0 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,3 +482,50 @@ func BenchmarkDecodeToChunk(b *testing.B) {
codec.DecodeToChunk(buffer, chk)
}
}

func BenchmarkReadRowsDataWithNewChunk(b *testing.B) {
numCols := 4
numRows := 1024

colTypes := make([]*types.FieldType, numCols)
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
buffer := populateBuffer()

b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
chk := chunk.New(colTypes, numRows, numRows)
b.StartTimer()
mockReadRowsData(buffer, colTypes, chk)
}
}

func BenchmarkDecodeToChunkWithNewChunk(b *testing.B) {
numCols := 4
numRows := 1024

colTypes := make([]*types.FieldType, numCols)
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
chk := chunk.New(colTypes, numRows, numRows)

for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ {
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ {
chk.AppendInt64(colOrdinal, 123)
}
}

codec := chunk.NewCodec(colTypes)
buffer := codec.Encode(chk)

b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
chk := chunk.New(colTypes, numRows, numRows)
b.StartTimer()
codec.DecodeToChunk(buffer, chk)
}
}
35 changes: 33 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type selectResult struct {
feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
sqlType string
encodeType tipb.EncodeType

// copPlanIDs contains all copTasks' planIDs,
// which help to collect copTasks' runtime stats.
Expand Down Expand Up @@ -145,8 +146,30 @@ func (r *selectResult) NextRaw(ctx context.Context) (data []byte, err error) {
// Next reads data to the chunk.
func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
// Check the returned data is default/arrow format.
XuHuaiyu marked this conversation as resolved.
Show resolved Hide resolved
if r.selectResp == nil || (len(r.selectResp.RowBatchData) == 0 && r.respChkIdx == len(r.selectResp.Chunks)) {
err := r.getSelectResp()
if err != nil || r.selectResp == nil {
return err
}
// TODO(Shenghui Wu): add metrics
if len(r.selectResp.RowBatchData) == 0 {
r.encodeType = tipb.EncodeType_TypeDefault
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, we only need to do this check when the Next() function is called the first time. All the cop responses should have the same decode method. Correct me if I'm wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the check is temporary ad-hoc. The next pr will add a field response_format in response, and use the field to decide the decode format. Then i will remove the check in next pr. It is OK?

}
}

switch r.encodeType {
case tipb.EncodeType_TypeDefault:
return r.readFromDefault(ctx, chk)
case tipb.EncodeType_TypeArrow:
return r.readFromArrow(ctx, chk)
}
return errors.Errorf("unsupported encode type:%v", r.encodeType)
}

func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) error {
for !chk.IsFull() {
if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) {
if r.respChkIdx == len(r.selectResp.Chunks) {
err := r.getSelectResp()
if err != nil || r.selectResp == nil {
return err
Expand All @@ -163,6 +186,14 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}

func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error {
rowBatchData := r.selectResp.RowBatchData
codec := chunk.NewCodec(r.fieldTypes)
XuHuaiyu marked this conversation as resolved.
Show resolved Hide resolved
remained := codec.DecodeToChunk(rowBatchData, chk)
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
r.selectResp.RowBatchData = remained
return nil
}

func (r *selectResult) getSelectResp() error {
r.respChkIdx = 0
for {
Expand Down Expand Up @@ -196,7 +227,7 @@ func (r *selectResult) getSelectResp() error {
r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts)
r.partialCount++
sc.MergeExecDetails(re.result.GetExecDetails(), nil)
if len(r.selectResp.Chunks) == 0 {
if len(r.selectResp.Chunks) == 0 && len(r.selectResp.RowBatchData) == 0 {
continue
}
return nil
Expand Down
5 changes: 3 additions & 2 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) {
if err != nil {
return nil, err
}
distsql.SetEncodeType(e.ctx, dagReq)
return dagReq, nil
}

Expand Down Expand Up @@ -249,7 +250,7 @@ func (e *RecoverIndexExec) buildDAGPB(txn kv.Transaction, limitCnt uint64) (*tip

limitExec := e.constructLimitPB(limitCnt)
dagReq.Executors = append(dagReq.Executors, limitExec)

distsql.SetEncodeType(e.ctx, dagReq)
return dagReq, nil
}

Expand Down Expand Up @@ -684,7 +685,7 @@ func (e *CleanupIndexExec) buildIdxDAGPB(txn kv.Transaction) (*tipb.DAGRequest,

limitExec := e.constructLimitPB()
dagReq.Executors = append(dagReq.Executors, limitExec)

distsql.SetEncodeType(e.ctx, dagReq)
return dagReq, nil
}

Expand Down
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1699,6 +1699,8 @@ func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dag
sc := b.ctx.GetSessionVars().StmtCtx
dagReq.Flags = sc.PushDownFlags()
dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans)

distsql.SetEncodeType(b.ctx, dagReq)
return dagReq, streaming, err
}

Expand Down
7 changes: 6 additions & 1 deletion executor/table_readers_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -128,7 +129,11 @@ func buildTableReader(sctx sessionctx.Context) Executor {

func buildMockDAGRequest(sctx sessionctx.Context) *tipb.DAGRequest {
builder := newExecutorBuilder(sctx, nil)
req, _, err := builder.constructDAGReq(nil)
req, _, err := builder.constructDAGReq([]core.PhysicalPlan{&core.PhysicalTableScan{
Columns: []*model.ColumnInfo{},
Table: &model.TableInfo{ID: 12345, PKIsHandle: false},
Desc: false,
}})
if err != nil {
panic(err)
}
Expand Down
7 changes: 4 additions & 3 deletions expression/distsql_builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
"github.com/pingcap/tipb/go-tipb"
)

func pbTypeToFieldType(tp *tipb.FieldType) *types.FieldType {
// PbTypeToFieldType converts tipb.FieldType to FieldType
func PbTypeToFieldType(tp *tipb.FieldType) *types.FieldType {
return &types.FieldType{
Tp: byte(tp.Tp),
Flag: uint(tp.Flag),
Expand All @@ -40,7 +41,7 @@ func pbTypeToFieldType(tp *tipb.FieldType) *types.FieldType {
}

func getSignatureByPB(ctx sessionctx.Context, sigCode tipb.ScalarFuncSig, tp *tipb.FieldType, args []Expression) (f builtinFunc, e error) {
fieldTp := pbTypeToFieldType(tp)
fieldTp := PbTypeToFieldType(tp)
base := newBaseBuiltinFunc(ctx, args)
base.tp = fieldTp
switch sigCode {
Expand Down Expand Up @@ -551,7 +552,7 @@ func PBToExpr(expr *tipb.Expr, tps []*types.FieldType, sc *stmtctx.StatementCont
}

func convertTime(data []byte, ftPB *tipb.FieldType, tz *time.Location) (*Constant, error) {
ft := pbTypeToFieldType(ftPB)
ft := PbTypeToFieldType(ftPB)
_, v, err := codec.DecodeUint(data)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2018,7 +2018,7 @@ func (s *testSchemaSuite) TestTableReaderChunk(c *C) {
}
c.Assert(count, Equals, 100)
// FIXME: revert this result to new group value after distsql can handle initChunkSize.
c.Assert(numChunks, Equals, 1)
c.Assert(numChunks, Equals, 10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's changed to 10?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the test case, it SplitTable to 10 regions and will get 10 chunks in response.
In default format, it will append all result to one chunk.
In arrow format, it will return the chunk directly. So numChunks is changed to 10.

rs.Close()
}

Expand Down
13 changes: 13 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,9 @@ type SessionVars struct {
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
EnableStreaming bool

// EnableArrow indicates whether the coprocessor request can use arrow API.
EnableArrow bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make this config item be able to be reloaded without restarting the tidb-server like other hot-reloadable config items. It's not urgent, we can do this in another PR.


writeStmtBufs WriteStmtBufs

// L2CacheSize indicates the size of CPU L2 cache, using byte as unit.
Expand Down Expand Up @@ -525,6 +528,14 @@ func NewSessionVars() *SessionVars {
enableStreaming = "0"
}
terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming))

var enableArrow string
if config.GetGlobalConfig().TiKVClient.EnableArrow {
enableArrow = "1"
} else {
enableArrow = "0"
}
terror.Log(vars.SetSystemVar(TiDBEnableArrow, enableArrow))
return vars
}

Expand Down Expand Up @@ -848,6 +859,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.DisableTxnAutoRetry = TiDBOptOn(val)
case TiDBEnableStreaming:
s.EnableStreaming = TiDBOptOn(val)
case TiDBEnableArrow:
s.EnableArrow = TiDBOptOn(val)
case TiDBEnableCascadesPlanner:
s.EnableCascadesPlanner = TiDBOptOn(val)
case TiDBOptimizerSelectivityLevel:
Expand Down
Loading