Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: rename Arrow to Chunk #13060

Merged
merged 5 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ type TiKVClient struct {
MaxBatchWaitTime time.Duration `toml:"max-batch-wait-time" json:"max-batch-wait-time"`
// BatchWaitSize is the max wait size for batch.
BatchWaitSize uint `toml:"batch-wait-size" json:"batch-wait-size"`
// EnableArrow indicate the data encode in arrow format.
EnableArrow bool `toml:"enable-arrow" json:"enable-arrow"`
// EnableChunkRPC indicate the data encode in chunk format for coprocessor requests.
EnableChunkRPC bool `toml:"enable-chunk-rpc" json:"enable-chunk-rpc"`
// If a Region has not been accessed for more than the given duration (in seconds), it
// will be reloaded from the PD.
RegionCacheTTL uint `toml:"region-cache-ttl" json:"region-cache-ttl"`
Expand Down Expand Up @@ -499,7 +499,7 @@ var defaultConf = Config{
MaxBatchWaitTime: 0,
BatchWaitSize: 8,

EnableArrow: true,
EnableChunkRPC: true,

RegionCacheTTL: 600,
},
Expand Down
4 changes: 2 additions & 2 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ max-batch-wait-time = 0
# Batch wait size, to avoid waiting too long.
batch-wait-size = 8

# Enable chunk encoded data.
enable-arrow = true
# Enable chunk encoded data for coprocessor requests.
enable-chunk-rpc = true

# If a Region has not been accessed for more than the given duration (in seconds), it
# will be reloaded from the PD.
Expand Down
14 changes: 7 additions & 7 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
}, nil
}
encodetype := tipb.EncodeType_TypeDefault
if enableTypeArrow(sctx) {
encodetype = tipb.EncodeType_TypeArrow
if canUseChunkRPC(sctx) {
encodetype = tipb.EncodeType_TypeChunk
}
return &selectResult{
label: "dag",
Expand Down Expand Up @@ -152,18 +152,18 @@ func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv

// SetEncodeType sets the encoding method for the DAGRequest. The supported encoding
// methods are:
// 1. TypeArrow: the result is encoded using the Chunk format, refer util/chunk/chunk.go
// 1. TypeChunk: the result is encoded using the Chunk format, refer util/chunk/chunk.go
// 2. TypeDefault: the result is encoded row by row
func SetEncodeType(ctx sessionctx.Context, dagReq *tipb.DAGRequest) {
if enableTypeArrow(ctx) {
dagReq.EncodeType = tipb.EncodeType_TypeArrow
if canUseChunkRPC(ctx) {
dagReq.EncodeType = tipb.EncodeType_TypeChunk
} else {
dagReq.EncodeType = tipb.EncodeType_TypeDefault
}
}

func enableTypeArrow(ctx sessionctx.Context) bool {
if !ctx.GetSessionVars().EnableArrow {
func canUseChunkRPC(ctx sessionctx.Context) bool {
if !ctx.GetSessionVars().EnableChunkRPC {
return false
}
if ctx.GetSessionVars().EnableStreaming {
Expand Down
12 changes: 6 additions & 6 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *testSuite) TestSelectMemTracker(c *C) {
}

func (s *testSuite) TestSelectNormalChunkSize(c *C) {
s.sctx.GetSessionVars().EnableArrow = false
s.sctx.GetSessionVars().EnableChunkRPC = false
response, colTypes := s.createSelectNormal(100, 1000000, c, nil)
response.Fetch(context.TODO())
s.testChunkSize(response, colTypes, c)
Expand Down Expand Up @@ -289,7 +289,7 @@ func (s *testSuite) testChunkSize(response SelectResult, colTypes []*types.Field
}

func (s *testSuite) TestAnalyze(c *C) {
s.sctx.GetSessionVars().EnableArrow = false
s.sctx.GetSessionVars().EnableChunkRPC = false
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetAnalyzeRequest(&tipb.AnalyzeReq{}).
SetKeepOrder(true).
Expand All @@ -315,7 +315,7 @@ func (s *testSuite) TestAnalyze(c *C) {
}

func (s *testSuite) TestChecksum(c *C) {
s.sctx.GetSessionVars().EnableArrow = false
s.sctx.GetSessionVars().EnableChunkRPC = false
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetChecksumRequest(&tipb.ChecksumRequest{}).
Build()
Expand Down Expand Up @@ -370,7 +370,7 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) {
resp.count += numRows

var chunks []tipb.Chunk
if !enableTypeArrow(resp.ctx) {
if !canUseChunkRPC(resp.ctx) {
datum := types.NewIntDatum(1)
bytes := make([]byte, 0, 100)
bytes, _ = codec.EncodeValue(nil, bytes, datum, datum, datum, datum)
Expand Down Expand Up @@ -408,8 +408,8 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) {
Chunks: chunks,
OutputCounts: []int64{1},
}
if enableTypeArrow(resp.ctx) {
respPB.EncodeType = tipb.EncodeType_TypeArrow
if canUseChunkRPC(resp.ctx) {
respPB.EncodeType = tipb.EncodeType_TypeChunk
} else {
respPB.EncodeType = tipb.EncodeType_TypeDefault
}
Expand Down
8 changes: 4 additions & 4 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (r *selectResult) NextRaw(ctx context.Context) (data []byte, err error) {
// Next reads data to the chunk.
func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
// Check the returned data is default/arrow format.
// Check the returned data is default/chunk format.
if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) {
err := r.getSelectResp()
if err != nil || r.selectResp == nil {
Expand All @@ -158,8 +158,8 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
switch r.selectResp.GetEncodeType() {
case tipb.EncodeType_TypeDefault:
return r.readFromDefault(ctx, chk)
case tipb.EncodeType_TypeArrow:
return r.readFromArrow(ctx, chk)
case tipb.EncodeType_TypeChunk:
return r.readFromChunk(ctx, chk)
}
return errors.Errorf("unsupported encode type:%v", r.encodeType)
}
Expand All @@ -183,7 +183,7 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er
return nil
}

func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error {
func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) error {
if r.respChunkDecoder == nil {
r.respChunkDecoder = chunk.NewDecoder(
chunk.NewChunkWithCapacity(r.fieldTypes, 0),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/pingcap/parser v0.0.0-20191031081038-bfb0c3adf567
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible
github.com/pingcap/tipb v0.0.0-20191029074152-e6f0f14af644
github.com/pingcap/tipb v0.0.0-20191105120856-bd4b782c8393
github.com/prometheus/client_golang v0.9.0
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/prometheus/common v0.0.0-20181020173914-7e9e6cabbd39 // indirect
Expand Down
6 changes: 3 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 h1:GIEq+wZfrl2bc
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0/go.mod h1:G/6rJpnYwM0LKMec2rI82/5Kg6GaZMvlfB+e6/tvYmI=
github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible h1:H1jg0aDWz2SLRh3hNBo2HFtnuHtudIUvBumU7syRkic=
github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20191029074152-e6f0f14af644 h1:J+nYGNqumgP4jtBz5Nqre1wiE/HrLXrJpFpqOotfoNc=
github.com/pingcap/tipb v0.0.0-20191029074152-e6f0f14af644/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20191105120856-bd4b782c8393 h1:8XcpRME085GsIe3eiJGhmuDPAjG8CUa8VE/QnQAwmfM=
github.com/pingcap/tipb v0.0.0-20191105120856-bd4b782c8393/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -204,7 +204,6 @@ github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v1.0.1 h1:aCvUg6QPl3ibpQUxyLkrEkCHtPqYJL4x9AuhqVqFis4=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
Expand Down Expand Up @@ -243,6 +242,7 @@ github.com/yookoala/realpath v1.0.0/go.mod h1:gJJMA9wuX7AcqLy1+ffPatSCySA1FQ2S8Y
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20190320044326-77d4b742cdbf h1:rmttwKPEgG/l4UscTDYtaJgeUsedKPKSyFfNQLI6q+I=
go.etcd.io/etcd v0.0.0-20190320044326-77d4b742cdbf/go.mod h1:KSGwdbiFchh5KIC9My2+ZVl5/3ANcwohw50dpPwa2cw=
go.etcd.io/etcd v0.5.0-alpha.5.0.20190320044326-77d4b742cdbf h1:2pxGooJi3rmECPOvyqOyZgqqcKOF8Pg30aA1RXK4VuE=
go.etcd.io/etcd v0.5.0-alpha.5.0.20190320044326-77d4b742cdbf/go.mod h1:KSGwdbiFchh5KIC9My2+ZVl5/3ANcwohw50dpPwa2cw=
Expand Down
18 changes: 9 additions & 9 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ type SessionVars struct {
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
EnableStreaming bool

// EnableArrow indicates whether the coprocessor request can use arrow API.
EnableArrow bool
// EnableChunkRPC indicates whether the coprocessor request can use chunk API.
EnableChunkRPC bool

writeStmtBufs WriteStmtBufs

Expand Down Expand Up @@ -566,13 +566,13 @@ func NewSessionVars() *SessionVars {
}
terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming))

var enableArrow string
if config.GetGlobalConfig().TiKVClient.EnableArrow {
enableArrow = "1"
var enableChunkRPC string
if config.GetGlobalConfig().TiKVClient.EnableChunkRPC {
enableChunkRPC = "1"
} else {
enableArrow = "0"
enableChunkRPC = "0"
}
terror.Log(vars.SetSystemVar(TiDBEnableArrow, enableArrow))
terror.Log(vars.SetSystemVar(TiDBEnableChunkRPC, enableChunkRPC))
return vars
}

Expand Down Expand Up @@ -920,8 +920,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.DisableTxnAutoRetry = TiDBOptOn(val)
case TiDBEnableStreaming:
s.EnableStreaming = TiDBOptOn(val)
case TiDBEnableArrow:
s.EnableArrow = TiDBOptOn(val)
case TiDBEnableChunkRPC:
s.EnableChunkRPC = TiDBOptOn(val)
case TiDBEnableCascadesPlanner:
s.EnableCascadesPlanner = TiDBOptOn(val)
case TiDBOptimizerSelectivityLevel:
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TIDBMemQuotaIndexLookupJoin, strconv.FormatInt(DefTiDBMemQuotaIndexLookupJoin, 10)},
{ScopeSession, TIDBMemQuotaNestedLoopApply, strconv.FormatInt(DefTiDBMemQuotaNestedLoopApply, 10)},
{ScopeSession, TiDBEnableStreaming, "0"},
{ScopeSession, TiDBEnableArrow, "1"},
{ScopeSession, TiDBEnableChunkRPC, "1"},
{ScopeSession, TxnIsolationOneShot, ""},
{ScopeSession, TiDBEnableTablePartition, "auto"},
{ScopeGlobal | ScopeSession, TiDBHashJoinConcurrency, strconv.Itoa(DefTiDBHashJoinConcurrency)},
Expand Down
4 changes: 2 additions & 2 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ const (
// tidb_enable_streaming enables TiDB to use streaming API for coprocessor requests.
TiDBEnableStreaming = "tidb_enable_streaming"

// tidb_enable_arrow enables TiDB to use Chunk format for coprocessor requests.
TiDBEnableArrow = "tidb_enable_arrow"
// tidb_enable_chunk_rpc enables TiDB to use Chunk format for coprocessor requests.
TiDBEnableChunkRPC = "tidb_enable_chunk_rpc"

// tidb_optimizer_selectivity_level is used to control the selectivity estimation level.
TiDBOptimizerSelectivityLevel = "tidb_optimizer_selectivity_level"
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBSkipUTF8Check, TiDBOptAggPushDown,
TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze,
TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, TiDBEnableArrow,
TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, TiDBEnableChunkRPC,
TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction,
TiDBCheckMb4ValueInUTF8, TiDBLowResolutionTSO, TiDBEnableIndexMerge, TiDBEnableNoopFuncs,
TiDBScatterRegion, TiDBGeneralLog, TiDBConstraintCheckInPlace, TiDBEnableVectorizedExpression, TiDBRecordPlanInSlowLog:
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (h *Handle) Clear() {
h.feedback = h.feedback[:0]
h.mu.ctx.GetSessionVars().InitChunkSize = 1
h.mu.ctx.GetSessionVars().MaxChunkSize = 1
h.mu.ctx.GetSessionVars().EnableArrow = false
h.mu.ctx.GetSessionVars().EnableChunkRPC = false
h.mu.ctx.GetSessionVars().ProjectionConcurrency = 0
h.listHead = &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}
h.globalMap = make(tableDeltaMap)
Expand Down
8 changes: 4 additions & 4 deletions store/mockstore/mocktikv/cop_handler_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,10 @@ func (h *rpcHandler) fillUpData4SelectResponse(selResp *tipb.SelectResponse, dag
switch dagReq.EncodeType {
case tipb.EncodeType_TypeDefault:
h.encodeDefault(selResp, rows, dagReq.OutputOffsets)
case tipb.EncodeType_TypeArrow:
case tipb.EncodeType_TypeChunk:
colTypes := h.constructRespSchema(dagCtx)
loc := dagCtx.evalCtx.sc.TimeZone
err := h.encodeArrow(selResp, rows, colTypes, dagReq.OutputOffsets, loc)
err := h.encodeChunk(selResp, rows, colTypes, dagReq.OutputOffsets, loc)
if err != nil {
return err
}
Expand Down Expand Up @@ -612,7 +612,7 @@ func (h *rpcHandler) encodeDefault(selResp *tipb.SelectResponse, rows [][][]byte
selResp.EncodeType = tipb.EncodeType_TypeDefault
}

func (h *rpcHandler) encodeArrow(selResp *tipb.SelectResponse, rows [][][]byte, colTypes []*types.FieldType, colOrdinal []uint32, loc *time.Location) error {
func (h *rpcHandler) encodeChunk(selResp *tipb.SelectResponse, rows [][][]byte, colTypes []*types.FieldType, colOrdinal []uint32, loc *time.Location) error {
var chunks []tipb.Chunk
respColTypes := make([]*types.FieldType, 0, len(colOrdinal))
for _, ordinal := range colOrdinal {
Expand Down Expand Up @@ -642,7 +642,7 @@ func (h *rpcHandler) encodeArrow(selResp *tipb.SelectResponse, rows [][][]byte,
chk.Reset()
}
selResp.Chunks = chunks
selResp.EncodeType = tipb.EncodeType_TypeArrow
selResp.EncodeType = tipb.EncodeType_TypeChunk
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func loadConfig() string {
var hotReloadConfigItems = []string{"Performance.MaxProcs", "Performance.MaxMemory", "Performance.CrossJoin",
"Performance.FeedbackProbability", "Performance.QueryFeedbackLimit", "Performance.PseudoEstimateRatio",
"OOMUseTmpStorage", "OOMAction", "MemQuotaQuery", "StmtSummary.MaxStmtCount", "StmtSummary.MaxSQLLength", "Log.QueryLogMaxLen",
"TiKVClient.EnableArrow"}
"TiKVClient.EnableChunkRPC"}

func reloadConfig(nc, c *config.Config) {
// Just a part of config items need to be reload explicitly.
Expand Down
2 changes: 1 addition & 1 deletion util/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *testChunkSuite) TestChunk(c *check.C) {
row := chk.GetRow(0)
c.Assert(row.GetFloat32(0), check.Equals, f32Val)
c.Assert(row.GetTime(2).Compare(tVal), check.Equals, 0)
// fsp no longer maintain in arrow
// fsp is no longer maintained in chunk
c.Assert(row.GetDuration(3, 0).Duration, check.DeepEquals, durVal.Duration)
c.Assert(row.GetEnum(4), check.DeepEquals, enumVal)
c.Assert(row.GetSet(5), check.DeepEquals, setVal)
Expand Down