Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: rename Arrow to Chunk #13060

Merged
merged 5 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ type TiKVClient struct {
MaxBatchWaitTime time.Duration `toml:"max-batch-wait-time" json:"max-batch-wait-time"`
// BatchWaitSize is the max wait size for batch.
BatchWaitSize uint `toml:"batch-wait-size" json:"batch-wait-size"`
// EnableArrow indicate the data encode in arrow format.
EnableArrow bool `toml:"enable-arrow" json:"enable-arrow"`
// EnableChunkResponse indicate the data encode in chunk format for coprocessor requests.
EnableChunkResponse bool `toml:"enable-chunk-response" json:"enable-chunk-response"`
// If a Region has not been accessed for more than the given duration (in seconds), it
// will be reloaded from the PD.
RegionCacheTTL uint `toml:"region-cache-ttl" json:"region-cache-ttl"`
Expand Down Expand Up @@ -499,7 +499,7 @@ var defaultConf = Config{
MaxBatchWaitTime: 0,
BatchWaitSize: 8,

EnableArrow: true,
EnableChunkResponse: true,

RegionCacheTTL: 600,
},
Expand Down
4 changes: 2 additions & 2 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ max-batch-wait-time = 0
# Batch wait size, to avoid waiting too long.
batch-wait-size = 8

# Enable chunk encoded data.
enable-arrow = true
# Enable chunk encoded data for coprocessor requests.
enable-chunk-response = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about s/enable-chunk-response/enable-chunk-rpc/?


# If a Region has not been accessed for more than the given duration (in seconds), it
# will be reloaded from the PD.
Expand Down
14 changes: 7 additions & 7 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
}, nil
}
encodetype := tipb.EncodeType_TypeDefault
if enableTypeArrow(sctx) {
encodetype = tipb.EncodeType_TypeArrow
if enableTypeChunk(sctx) {
encodetype = tipb.EncodeType_TypeChunk
}
return &selectResult{
label: "dag",
Expand Down Expand Up @@ -152,18 +152,18 @@ func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv

// SetEncodeType sets the encoding method for the DAGRequest. The supported encoding
// methods are:
// 1. TypeArrow: the result is encoded using the Chunk format, refer util/chunk/chunk.go
// 1. TypeChunk: the result is encoded using the Chunk format, refer util/chunk/chunk.go
// 2. TypeDefault: the result is encoded row by row
func SetEncodeType(ctx sessionctx.Context, dagReq *tipb.DAGRequest) {
if enableTypeArrow(ctx) {
dagReq.EncodeType = tipb.EncodeType_TypeArrow
if enableTypeChunk(ctx) {
dagReq.EncodeType = tipb.EncodeType_TypeChunk
} else {
dagReq.EncodeType = tipb.EncodeType_TypeDefault
}
}

func enableTypeArrow(ctx sessionctx.Context) bool {
if !ctx.GetSessionVars().EnableArrow {
func enableTypeChunk(ctx sessionctx.Context) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about s/enableTypeChunk/canUseChunkRPC/

if !ctx.GetSessionVars().EnableChunkResponse {
return false
}
if ctx.GetSessionVars().EnableStreaming {
Expand Down
12 changes: 6 additions & 6 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *testSuite) TestSelectMemTracker(c *C) {
}

func (s *testSuite) TestSelectNormalChunkSize(c *C) {
s.sctx.GetSessionVars().EnableArrow = false
s.sctx.GetSessionVars().EnableChunkResponse = false
response, colTypes := s.createSelectNormal(100, 1000000, c, nil)
response.Fetch(context.TODO())
s.testChunkSize(response, colTypes, c)
Expand Down Expand Up @@ -289,7 +289,7 @@ func (s *testSuite) testChunkSize(response SelectResult, colTypes []*types.Field
}

func (s *testSuite) TestAnalyze(c *C) {
s.sctx.GetSessionVars().EnableArrow = false
s.sctx.GetSessionVars().EnableChunkResponse = false
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetAnalyzeRequest(&tipb.AnalyzeReq{}).
SetKeepOrder(true).
Expand All @@ -315,7 +315,7 @@ func (s *testSuite) TestAnalyze(c *C) {
}

func (s *testSuite) TestChecksum(c *C) {
s.sctx.GetSessionVars().EnableArrow = false
s.sctx.GetSessionVars().EnableChunkResponse = false
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetChecksumRequest(&tipb.ChecksumRequest{}).
Build()
Expand Down Expand Up @@ -370,7 +370,7 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) {
resp.count += numRows

var chunks []tipb.Chunk
if !enableTypeArrow(resp.ctx) {
if !enableTypeChunk(resp.ctx) {
datum := types.NewIntDatum(1)
bytes := make([]byte, 0, 100)
bytes, _ = codec.EncodeValue(nil, bytes, datum, datum, datum, datum)
Expand Down Expand Up @@ -408,8 +408,8 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) {
Chunks: chunks,
OutputCounts: []int64{1},
}
if enableTypeArrow(resp.ctx) {
respPB.EncodeType = tipb.EncodeType_TypeArrow
if enableTypeChunk(resp.ctx) {
respPB.EncodeType = tipb.EncodeType_TypeChunk
} else {
respPB.EncodeType = tipb.EncodeType_TypeDefault
}
Expand Down
8 changes: 4 additions & 4 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (r *selectResult) NextRaw(ctx context.Context) (data []byte, err error) {
// Next reads data to the chunk.
func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
// Check the returned data is default/arrow format.
// Check the returned data is default/chunk format.
if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) {
err := r.getSelectResp()
if err != nil || r.selectResp == nil {
Expand All @@ -158,8 +158,8 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
switch r.selectResp.GetEncodeType() {
case tipb.EncodeType_TypeDefault:
return r.readFromDefault(ctx, chk)
case tipb.EncodeType_TypeArrow:
return r.readFromArrow(ctx, chk)
case tipb.EncodeType_TypeChunk:
return r.readFromChunk(ctx, chk)
}
return errors.Errorf("unsupported encode type:%v", r.encodeType)
}
Expand All @@ -183,7 +183,7 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er
return nil
}

func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error {
func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) error {
if r.respChunkDecoder == nil {
r.respChunkDecoder = chunk.NewDecoder(
chunk.NewChunkWithCapacity(r.fieldTypes, 0),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible
github.com/pingcap/tipb v0.0.0-20191029074152-e6f0f14af644
github.com/pingcap/tipb v0.0.0-20191105120856-bd4b782c8393
github.com/prometheus/client_golang v0.9.0
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/prometheus/common v0.0.0-20181020173914-7e9e6cabbd39 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 h1:GIEq+wZfrl2bc
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0/go.mod h1:G/6rJpnYwM0LKMec2rI82/5Kg6GaZMvlfB+e6/tvYmI=
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU=
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20191029074152-e6f0f14af644 h1:J+nYGNqumgP4jtBz5Nqre1wiE/HrLXrJpFpqOotfoNc=
github.com/pingcap/tipb v0.0.0-20191029074152-e6f0f14af644/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20191105120856-bd4b782c8393 h1:8XcpRME085GsIe3eiJGhmuDPAjG8CUa8VE/QnQAwmfM=
github.com/pingcap/tipb v0.0.0-20191105120856-bd4b782c8393/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
18 changes: 9 additions & 9 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ type SessionVars struct {
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
EnableStreaming bool

// EnableArrow indicates whether the coprocessor request can use arrow API.
EnableArrow bool
// EnableChunkResponse indicates whether the coprocessor request can use chunk API.
EnableChunkResponse bool

writeStmtBufs WriteStmtBufs

Expand Down Expand Up @@ -566,13 +566,13 @@ func NewSessionVars() *SessionVars {
}
terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming))

var enableArrow string
if config.GetGlobalConfig().TiKVClient.EnableArrow {
enableArrow = "1"
var enableChunkResponse string
if config.GetGlobalConfig().TiKVClient.EnableChunkResponse {
enableChunkResponse = "1"
} else {
enableArrow = "0"
enableChunkResponse = "0"
}
terror.Log(vars.SetSystemVar(TiDBEnableArrow, enableArrow))
terror.Log(vars.SetSystemVar(TiDBEnableChunkResponse, enableChunkResponse))
return vars
}

Expand Down Expand Up @@ -920,8 +920,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.DisableTxnAutoRetry = TiDBOptOn(val)
case TiDBEnableStreaming:
s.EnableStreaming = TiDBOptOn(val)
case TiDBEnableArrow:
s.EnableArrow = TiDBOptOn(val)
case TiDBEnableChunkResponse:
s.EnableChunkResponse = TiDBOptOn(val)
case TiDBEnableCascadesPlanner:
s.EnableCascadesPlanner = TiDBOptOn(val)
case TiDBOptimizerSelectivityLevel:
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TIDBMemQuotaIndexLookupJoin, strconv.FormatInt(DefTiDBMemQuotaIndexLookupJoin, 10)},
{ScopeSession, TIDBMemQuotaNestedLoopApply, strconv.FormatInt(DefTiDBMemQuotaNestedLoopApply, 10)},
{ScopeSession, TiDBEnableStreaming, "0"},
{ScopeSession, TiDBEnableArrow, "1"},
{ScopeSession, TiDBEnableChunkResponse, "1"},
{ScopeSession, TxnIsolationOneShot, ""},
{ScopeSession, TiDBEnableTablePartition, "auto"},
{ScopeGlobal | ScopeSession, TiDBHashJoinConcurrency, strconv.Itoa(DefTiDBHashJoinConcurrency)},
Expand Down
4 changes: 2 additions & 2 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ const (
// tidb_enable_streaming enables TiDB to use streaming API for coprocessor requests.
TiDBEnableStreaming = "tidb_enable_streaming"

// tidb_enable_arrow enables TiDB to use Chunk format for coprocessor requests.
TiDBEnableArrow = "tidb_enable_arrow"
// tidb_enable_chunk_response enables TiDB to use Chunk format for coprocessor requests.
TiDBEnableChunkResponse = "tidb_enable_chunk_response"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


// tidb_optimizer_selectivity_level is used to control the selectivity estimation level.
TiDBOptimizerSelectivityLevel = "tidb_optimizer_selectivity_level"
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBSkipUTF8Check, TiDBOptAggPushDown,
TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze,
TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, TiDBEnableArrow,
TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, TiDBEnableChunkResponse,
TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction,
TiDBCheckMb4ValueInUTF8, TiDBLowResolutionTSO, TiDBEnableIndexMerge, TiDBEnableNoopFuncs,
TiDBScatterRegion, TiDBGeneralLog, TiDBConstraintCheckInPlace, TiDBEnableVectorizedExpression, TiDBRecordPlanInSlowLog:
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (h *Handle) Clear() {
h.feedback = h.feedback[:0]
h.mu.ctx.GetSessionVars().InitChunkSize = 1
h.mu.ctx.GetSessionVars().MaxChunkSize = 1
h.mu.ctx.GetSessionVars().EnableArrow = false
h.mu.ctx.GetSessionVars().EnableChunkResponse = false
h.mu.ctx.GetSessionVars().ProjectionConcurrency = 0
h.listHead = &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}
h.globalMap = make(tableDeltaMap)
Expand Down
8 changes: 4 additions & 4 deletions store/mockstore/mocktikv/cop_handler_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,10 @@ func (h *rpcHandler) fillUpData4SelectResponse(selResp *tipb.SelectResponse, dag
switch dagReq.EncodeType {
case tipb.EncodeType_TypeDefault:
h.encodeDefault(selResp, rows, dagReq.OutputOffsets)
case tipb.EncodeType_TypeArrow:
case tipb.EncodeType_TypeChunk:
colTypes := h.constructRespSchema(dagCtx)
loc := dagCtx.evalCtx.sc.TimeZone
err := h.encodeArrow(selResp, rows, colTypes, dagReq.OutputOffsets, loc)
err := h.encodeChunk(selResp, rows, colTypes, dagReq.OutputOffsets, loc)
if err != nil {
return err
}
Expand Down Expand Up @@ -612,7 +612,7 @@ func (h *rpcHandler) encodeDefault(selResp *tipb.SelectResponse, rows [][][]byte
selResp.EncodeType = tipb.EncodeType_TypeDefault
}

func (h *rpcHandler) encodeArrow(selResp *tipb.SelectResponse, rows [][][]byte, colTypes []*types.FieldType, colOrdinal []uint32, loc *time.Location) error {
func (h *rpcHandler) encodeChunk(selResp *tipb.SelectResponse, rows [][][]byte, colTypes []*types.FieldType, colOrdinal []uint32, loc *time.Location) error {
var chunks []tipb.Chunk
respColTypes := make([]*types.FieldType, 0, len(colOrdinal))
for _, ordinal := range colOrdinal {
Expand Down Expand Up @@ -642,7 +642,7 @@ func (h *rpcHandler) encodeArrow(selResp *tipb.SelectResponse, rows [][][]byte,
chk.Reset()
}
selResp.Chunks = chunks
selResp.EncodeType = tipb.EncodeType_TypeArrow
selResp.EncodeType = tipb.EncodeType_TypeChunk
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func loadConfig() string {
var hotReloadConfigItems = []string{"Performance.MaxProcs", "Performance.MaxMemory", "Performance.CrossJoin",
"Performance.FeedbackProbability", "Performance.QueryFeedbackLimit", "Performance.PseudoEstimateRatio",
"OOMUseTmpStorage", "OOMAction", "MemQuotaQuery", "StmtSummary.MaxStmtCount", "StmtSummary.MaxSQLLength", "Log.QueryLogMaxLen",
"TiKVClient.EnableArrow"}
"TiKVClient.EnableChunkResponse"}

func reloadConfig(nc, c *config.Config) {
// Just a part of config items need to be reload explicitly.
Expand Down
3 changes: 1 addition & 2 deletions util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (

var msgErrSelNotNil = "The selection vector of Chunk is not nil. Please file a bug to the TiDB Team"

// Chunk stores multiple rows of data in Apache Arrow format.
// See https://arrow.apache.org/docs/memory_layout.html
// Chunk stores multiple rows of data.
wshwsh12 marked this conversation as resolved.
Show resolved Hide resolved
// Values are appended in compact format and can be directly accessed without decoding.
// When the chunk is done processing, we can reuse the allocated memory by resetting it.
type Chunk struct {
Expand Down
2 changes: 1 addition & 1 deletion util/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *testChunkSuite) TestChunk(c *check.C) {
row := chk.GetRow(0)
c.Assert(row.GetFloat32(0), check.Equals, f32Val)
c.Assert(row.GetTime(2).Compare(tVal), check.Equals, 0)
// fsp no longer maintain in arrow
// fsp no longer maintain in chunk
wshwsh12 marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(row.GetDuration(3, 0).Duration, check.DeepEquals, durVal.Duration)
c.Assert(row.GetEnum(4), check.DeepEquals, enumVal)
c.Assert(row.GetSet(5), check.DeepEquals, setVal)
Expand Down
3 changes: 1 addition & 2 deletions util/chunk/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func (c *Column) AppendSet(set types.Set) {
c.appendNameValue(set.Name, set.Value)
}

// Column stores one column of data in Apache Arrow format.
// See https://arrow.apache.org/docs/memory_layout.html
// Column stores one column of data.
wshwsh12 marked this conversation as resolved.
Show resolved Hide resolved
type Column struct {
length int
nullBitmap []byte // bit 0 is null, 1 is not null
Expand Down