Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support fast analyze. #9973

Closed
wants to merge 72 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
38eb623
ci
lzmhhh123 Mar 26, 2019
f6ee210
ci
lzmhhh123 Mar 28, 2019
f0105d0
ci
lzmhhh123 Mar 31, 2019
f05cf4a
Merge branch 'dev/plug_in_debug_pb' into dev/fast_analyze
lzmhhh123 Mar 31, 2019
586cf13
ci
lzmhhh123 Apr 1, 2019
4cd7147
improve
lzmhhh123 Apr 1, 2019
5c2e687
debug
lzmhhh123 Apr 1, 2019
289268b
improve
lzmhhh123 Apr 2, 2019
957d0a7
add some TODOs
lzmhhh123 Apr 2, 2019
86ea376
debug
lzmhhh123 Apr 2, 2019
6763c80
address comments
lzmhhh123 Apr 2, 2019
f412d13
address comments
lzmhhh123 Apr 2, 2019
e91daaa
add idx collector
lzmhhh123 Apr 2, 2019
0e9640c
ci
lzmhhh123 Apr 4, 2019
925832f
ci
lzmhhh123 Apr 4, 2019
e77c0f1
ci
lzmhhh123 Apr 4, 2019
05fbe0e
handle scan task
lzmhhh123 Apr 4, 2019
61f84d7
address comments
lzmhhh123 Apr 8, 2019
b191070
address comments
lzmhhh123 Apr 8, 2019
a149b6f
debug
lzmhhh123 Apr 8, 2019
b9fab49
improve
lzmhhh123 Apr 8, 2019
1381dba
address comments
lzmhhh123 Apr 8, 2019
c1deecc
ci
lzmhhh123 Apr 9, 2019
eedfef7
change client to snapshot
lzmhhh123 Apr 9, 2019
1552da1
fix ci
lzmhhh123 Apr 9, 2019
00b8a2a
address comments
lzmhhh123 Apr 9, 2019
d0351e3
improve
lzmhhh123 Apr 10, 2019
0ecb10f
address comments
lzmhhh123 Apr 10, 2019
1374c34
Merge branch 'master' into dev/fast_analyze
lzmhhh123 Apr 11, 2019
a3ce332
remove code
lzmhhh123 Apr 11, 2019
2ca551b
fix ci
lzmhhh123 Apr 11, 2019
1a18ee7
debug
lzmhhh123 Apr 13, 2019
5bf7254
add unit tests
lzmhhh123 Apr 14, 2019
fbb123c
squash push
lzmhhh123 Apr 14, 2019
f175b34
Merge branch 'master' into dev/fast_analyze
lzmhhh123 Apr 15, 2019
bf265b0
add test
lzmhhh123 Apr 15, 2019
da5b9eb
Merge branch 'dev/fast_analyze' of https://github.com/lzmhhh123/tidb …
lzmhhh123 Apr 15, 2019
23aef4d
limit bucket size in unit test
lzmhhh123 Apr 15, 2019
2ea1d58
improve
lzmhhh123 Apr 15, 2019
fdb3c91
address comments
lzmhhh123 Apr 16, 2019
890c229
remove global rander
lzmhhh123 Apr 16, 2019
af03002
improve
lzmhhh123 Apr 16, 2019
9254400
Split core into a single pr
erjiaqing Apr 16, 2019
2433427
remove copy and equal from pr
erjiaqing Apr 16, 2019
a7a3a19
address comments
lzmhhh123 Apr 16, 2019
77dc45b
move some code into separate functions
erjiaqing Apr 17, 2019
cfc8a93
update
erjiaqing Apr 17, 2019
6ef86f2
address comment
lzmhhh123 Apr 17, 2019
7008f89
rename some variables
erjiaqing Apr 17, 2019
2055420
upd
erjiaqing Apr 17, 2019
af8e40c
fix
erjiaqing Apr 17, 2019
bfd2128
fix
erjiaqing Apr 17, 2019
8e63f5e
Merge branch 'master' into cms_topn_core
erjiaqing Apr 17, 2019
b244ab1
merge
erjiaqing Apr 17, 2019
40ac221
upd
erjiaqing Apr 17, 2019
d947e96
fix
erjiaqing Apr 17, 2019
6148210
fix
erjiaqing Apr 17, 2019
5472c8a
fix data race
lzmhhh123 Apr 17, 2019
544f215
fix
lzmhhh123 Apr 17, 2019
33aa77e
fix
lzmhhh123 Apr 18, 2019
d4f7684
debug
lzmhhh123 Apr 18, 2019
9cb5128
debug
lzmhhh123 Apr 18, 2019
6b078b5
debug
lzmhhh123 Apr 18, 2019
ba40c91
upd
erjiaqing Apr 18, 2019
c07b72c
some rename
erjiaqing Apr 18, 2019
42d32af
fix
erjiaqing Apr 18, 2019
250d6d6
improve
lzmhhh123 Apr 18, 2019
2d59027
fix
lzmhhh123 Apr 18, 2019
0a76db7
fix test
lzmhhh123 Apr 18, 2019
466428c
Merge remote-tracking branch 'gs/cms_topn_core' into dev/fast_analyze
lzmhhh123 Apr 21, 2019
53f9745
build cmsketch
lzmhhh123 Apr 21, 2019
d29d982
debug the calculation of ndv
lzmhhh123 Apr 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 305 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,18 @@
package executor

import (
"bytes"
"context"
"math"
"math/rand"
"runtime"
"sort"
"strconv"
"sync"
"sync/atomic"

"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/tidb/store/tikv/tikvrpc"

alivxxx marked this conversation as resolved.
Show resolved Hide resolved
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand All @@ -27,6 +36,8 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -108,12 +119,16 @@ type taskType int
const (
colTask taskType = iota
idxTask
colFastTask
idxFastTask
)

type analyzeTask struct {
taskType taskType
idxExec *AnalyzeIndexExec
colExec *AnalyzeColumnsExec
taskType taskType
idxExec *AnalyzeIndexExec
colExec *AnalyzeColumnsExec
idxFastExec *AnalyzeIndexFastExec
colFastExec *AnalyzeFastExec
}

var errAnalyzeWorkerPanic = errors.New("analyze worker panic")
Expand All @@ -137,6 +152,10 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<-
resultCh <- analyzeColumnsPushdown(task.colExec)
case idxTask:
resultCh <- analyzeIndexPushdown(task.idxExec)
case colFastTask:
resultCh <- analyzeColumnsFastExec(task.colFastExec)
case idxFastTask:
resultCh <- analyzeIndexFastExec(task.idxFastExec)
}
}
}
Expand Down Expand Up @@ -381,3 +400,286 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms []
}
return hists, cms, nil
}

func analyzeIndexFastExec(idxFastExec *AnalyzeIndexFastExec) statistics.AnalyzeResult {
hist, cms, err := idxFastExec.buildStats()
if err != nil {
return statistics.AnalyzeResult{Err: err}
}
result := statistics.AnalyzeResult{
Hist: []*statistics.Histogram{hist},
Cms: []*statistics.CMSketch{cms},
IsIndex: 1,
}
if hist.Len() > 0 {
result.Count = hist.Buckets[hist.Len()-1].Count
}
return result
}

// AnalyzeIndexFastExec represents Fast Analyze Index executor.
type AnalyzeIndexFastExec struct {
ctx sessionctx.Context
PhysicalTableID int64
idxInfo *model.IndexInfo
concurrency int
result distsql.SelectResult
maxNumBuckets uint64
table table.Table
}

func (e *AnalyzeIndexFastExec) buildStats() (hist *statistics.Histogram, cms *statistics.CMSketch, err error) {
// txn, err := e.ctx.Txn(true)
// if err != nil {
// return errors.Trace(err)
// }

// txn.
store, ok := e.ctx.GetStore().(tikv.Storage)
if !ok {
return nil, nil, errors.Errorf("Only support fast analyze in tikv storage.")
}
cache := store.GetRegionCache()
// regionReqSender := tikv.NewRegionRequestSender(cache, store.GetTiKVClient())
startKey, endKey := tablecodec.GetTableIndexKeyRange(e.table.Meta().ID, e.idxInfo.ID)
// regionIDs, err := cache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey)
// if err != nil {
// return nil, nil, errors.Trace(err)
// }
// for _, regionID := range regionIDs {
// region := cache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID)

// }
var loc *tikv.KeyLocation
bo := tikv.NewBackoffer(context.Background(), 500)
var scanLocs []*tikv.KeyLocation
var sampLocs []*tikv.KeyLocation
for loc, err = cache.LocateKey(bo, startKey); bytes.Compare(loc.StartKey, endKey) <= 0 && err == nil; loc, err = cache.LocateKey(bo, append(loc.EndKey, byte(0))) {
var start, end []byte
start = loc.StartKey
// region, err := cache.LoadRegionByID(bo, loc.Region.GetID())
// if err != nil {
// return nil, nil, errors.Trace(err)
// }
if bytes.Compare(endKey, loc.EndKey) < 0 || bytes.Compare(loc.StartKey, startKey) < 0 {
scanLocs = append(scanLocs, loc)
// req := &tikvrpc.Request{
// Type: tikvrpc.CmdGet,
// Context: *region.GetContext(),
// Get: &kvrpcpb.GetRequest{
// Key: endKey,
// },
// }
// resp, err := store.SendReq(bo, req, loc.Region, tikv.ReadTimeoutMedium)
// if err != nil {
// break
// }
// resp
} else {
sampLocs = append(sampLocs, loc)
}
}
if err != nil {
return nil, nil, errors.Trace(err)
}

rowCount := uint64(0)
for _, loc := range sampLocs {
_, _, startVals, err := tablecodec.DecodeIndexKey(loc.StartKey)
if err != nil {
return nil, nil, errors.Trace(err)
}
_, _, endVals, err := tablecodec.DecodeIndexKey(loc.EndKey)
if err != nil {
return nil, nil, errors.Trace(err)
}

}

return nil, nil, nil
}

func analyzeColumnsFastExec(colFastExec *AnalyzeFastExec) statistics.AnalyzeResult {
hist, cms, err := colFastExec.buildStats()
if err != nil {
return statistics.AnalyzeResult{Err: err}
}
result := statistics.AnalyzeResult{
Hist: []*statistics.Histogram{hist},
Cms: []*statistics.CMSketch{cms},
IsIndex: 0,
}
if hist.Len() > 0 {
result.Count = hist.Buckets[hist.Len()-1].Count
}
return result
}

// AnalyzeFastTask is the task for build stats.
type AnalyzeFastTask struct {
Location *tikv.KeyLocation
isScan bool
SampSize uint64
LRowCount uint64
RRowCount uint64
}

// AnalyzeFastExec represents Fast Analyze Columns executor.
type AnalyzeFastExec struct {
ctx sessionctx.Context
PhysicalTableID int64
colsInfo []*model.ColumnInfo
concurrency int
maxNumBuckets uint64
table table.Table
cache *tikv.RegionCache
wg *sync.WaitGroup
sampLocs chan *tikv.KeyLocation
sampLocRowCount uint64
tasks chan *AnalyzeFastTask
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
}

func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild *bool, err *error) {
defer e.wg.Done()
client := e.ctx.GetStore().(tikv.Storage).GetTiKVClient()
for {
loc, ok := <-e.sampLocs
if !ok {
return
}
req := &tikvrpc.Request{
DebugGetRegionProperties: &debugpb.GetRegionPropertiesRequest{
RegionId: loc.Region.GetID(),
},
}
var resp *tikvrpc.Response
var rpcCtx *tikv.RPCContext
rpcCtx, err := e.cache.GetRPCContext(bo, loc.Region)
if err != nil {
err = errors.Trace(err)
return
}
ctx := context.Background()
resp, err = client.SendRequest(ctx, rpcCtx.Addr, req, tikv.ReadTimeoutMedium)
if err != nil {
err = errors.Trace(err)
return
}
// TODO: duel with not_found
// ***
// ***
for i, name := range resp.DebugGetRegionProperties.Props {
if name == "num_rows" {
var cnt uint64
cnt, err = strconv.ParseUint(resp.DebugGetRegionProperties.Props[i].Value, 10, 64)
if err != nil {
err = errors.Trace(err)
return
}
newCount := atomic.AddUint64(e.sampLocRowCount, cnt)
task := &AnalyzeFastTask{
Location: loc,
LRowCount: newCount - cnt,
RRowCount: newCount,
}
e.Tasks <- task
}
}
}
}

func (e *AnalyzeFastExec) buildSampTask() (bool, error) {
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
if e.wg == nil {
e.wg = &sync.WaitGroup{}
}
e.wg.Wait()
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved

atomic.StoreUint64(&e.sampLocRowCount, 0)
bo := tikv.NewBackoffer(context.Background(), 500)
needRebuildForRoutine := make([]bool, 0, e.concurrency)
errs := make([]error, 0, e.concurrency)
for i := 0; i < e.concurrency; i++ {
e.wg.Add(1)
go e.getSampRegionsRowCount(bo, &needRebuildForRoutine[i], &errs[i])
}

store, ok := e.ctx.GetStore().(tikv.Storage)
if !ok {
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
return false, errors.Errorf("Only support fast analyze in tikv storage.")
}
e.cache = store.GetRegionCache()
startKey, endKey := tablecodec.GetTableHandleKeyRange(e.table.Meta().ID)
var loc *tikv.KeyLocation
var err error
for loc, err = e.cache.LocateKey(bo, startKey); bytes.Compare(loc.StartKey, endKey) <= 0 && err == nil; loc, err = e.cache.LocateKey(bo, append(loc.EndKey, byte(0))) {
var start, end []byte
start = loc.StartKey
if bytes.Compare(endKey, loc.EndKey) < 0 || bytes.Compare(loc.StartKey, startKey) < 0 {
e.tasks <- &AnalyzeFastTask{Location: loc, isScan: true}
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
} else {
e.sampLocs <- loc
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
}
}
if err != nil {
return false, errors.Trace(err)
}
close(e.sampLocs)
e.wg.Wait()
for i := 0; i < e.concurrency; i++ {
if errs[i] != nil {
return false, errors.Trace(errs[i])
}
}
for i := 0; i < e.concurrency; i++ {
if needRebuildForRoutine[i] == true {
return true, nil
}
}

return false, nil
}

func lowerBound(array []uint64, key uint64) int {
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
l, r := 0, Len(array)
for l < r {
mid = (l + r) >> 1
if array[mid] < key {
l = mid + 1
} else {
r = mid
}
}
return l
}

func (e *AnalyzeFastExec) buildStats() (hist *statistics.Histogram, cms *statistics.CMSketch, err error) {
for {
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
needRebuild, err := e.buildSampTask()
if err != nil {
return nil, nil, errors.Trace(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to call errors.Trace() anymore.

}

if needRebuild {
continue
}

if e.sampLocRowCount < maxSampleSize*10 {
// TODO: return normal Analyze
}

randPos := make([]uint64, 0, maxSampleSize+1)
for i := 0; i < maxSampleSize; i++ {
randPos = append(randPos, uint64(rand.Int63n(e.sampLockRowCount)))
}
randPos = append(randPos, math.MaxUint64)
sort.Sort(randPos)

for _, task := range e.tasks {
if task.isScan == true {
continue
}
task.SampSize = lowerBound(randPos, task.RRowCount) - lowerBound(randPos, task.LRowCount)
}

}
}
40 changes: 32 additions & 8 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,20 +1430,44 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) Executor {
tasks: make([]*analyzeTask, 0, len(v.ColTasks)+len(v.IdxTasks)),
}
for _, task := range v.ColTasks {
e.tasks = append(e.tasks, &analyzeTask{
taskType: colTask,
colExec: b.buildAnalyzeColumnsPushdown(task, v.MaxNumBuckets),
})
if v.EnableFastAnalyze {
e.tasks = append(e.tasks, &analyzeTask{
taskType: colFastTask,
colFastExec: &AnalyzeColumnsFastExec{
ctx: b.ctx,
PhysicalTableID: task.PhysicalTableID,
colsInfo: task.ColsInfo,
maxNumBuckets: v.MaxNumBuckets,
},
})
} else {
e.tasks = append(e.tasks, &analyzeTask{
taskType: colTask,
colExec: b.buildAnalyzeColumnsPushdown(task, v.MaxNumBuckets),
})
}
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
}
for _, task := range v.IdxTasks {
e.tasks = append(e.tasks, &analyzeTask{
taskType: idxTask,
idxExec: b.buildAnalyzeIndexPushdown(task, v.MaxNumBuckets),
})
if v.EnableFastAnalyze {
e.tasks = append(e.tasks, &analyzeTask{
taskType: idxFastTask,
idxFastExec: &AnalyzeIndexFastExec{
ctx: b.ctx,
PhysicalTableID: task.PhysicalTableID,
idxInfo: task.IndexInfo,
maxNumBuckets: v.MaxNumBuckets,
},
})
} else {
e.tasks = append(e.tasks, &analyzeTask{
taskType: idxTask,
idxExec: b.buildAnalyzeIndexPushdown(task, v.MaxNumBuckets),
})
}
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
Expand Down
Loading