Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: make explain support explain anaylze #7827

Merged
merged 8 commits into from
Oct 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ func (n *TraceStmt) Accept(v Visitor) (Node, bool) {
type ExplainStmt struct {
stmtNode

Stmt StmtNode
Format string
Stmt StmtNode
Format string
Analyze bool
}

// Accept implements Node Accept interface.
Expand Down
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,12 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) {
if sessVars.InRestrictedSQL {
internal = "[INTERNAL] "
}
execDetail := sessVars.StmtCtx.GetExecDetails()
if costTime < threshold {
logutil.SlowQueryLogger.Debugf(
"[QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
internal, costTime, execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
} else {
execDetail := sessVars.StmtCtx.GetExecDetails()
logutil.SlowQueryLogger.Warnf(
"[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
Expand Down
10 changes: 9 additions & 1 deletion executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"sync"
"time"

"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -501,6 +502,10 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro

// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.isUnparallelExec {
return errors.Trace(e.unparallelExec(ctx, chk))
Expand Down Expand Up @@ -756,8 +761,11 @@ func (e *StreamAggExec) Close() error {

// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()

for !e.executed && chk.NumRows() < e.maxChunkSize {
err := e.consumeOneGroup(ctx, chk)
if err != nil {
Expand Down
31 changes: 29 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -448,13 +449,12 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &PrepareExec{
return &PrepareExec{
baseExecutor: base,
is: b.is,
name: v.Name,
sqlText: v.SQLText,
}
return e
}

func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
Expand Down Expand Up @@ -659,6 +659,33 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) Executor {

// buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`.
func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor {
if v.Analyze {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this to the function body of ExplainExec.Next().

stmt := &ExecStmt{
InfoSchema: GetInfoSchema(b.ctx),
Plan: v.ExecPlan,
StmtNode: v.ExecStmt,
Ctx: b.ctx,
}
b.ctx.GetSessionVars().StmtCtx.RuntimeStats = execdetails.NewRuntimeStats()
ctx := context.Background()
rs, err := stmt.Exec(ctx)
if err != nil {
return nil
}
if rs != nil {
chk := rs.NewChunk()
for {
err := rs.Next(ctx, chk)
if err != nil {
return nil
}
if chk.NumRows() == 0 {
break
}
}
}
}
v.PrepareRows()
e := &ExplainExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
}
Expand Down
10 changes: 10 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/tidb/distsql"
Expand Down Expand Up @@ -243,6 +244,10 @@ func (e *IndexReaderExecutor) Close() error {

// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
err := e.result.Next(ctx, chk)
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -453,6 +458,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) {
lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
e.tblWorkerWg.Add(lookupConcurrencyLimit)
e.baseExecutor.ctx.GetSessionVars().StmtCtx.RuntimeStats.GetRuntimeStat(e.id + "_tableReader")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this collected stats is never used by the ExplainExec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.id + "_tableReader" question~~?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you only retrieve the runtime statistics of an operator by the operator id.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, this is by design, GetRuntimeStat will pre-init e.id + "_tableReader" slot in RuntimeStatsColl, after this call will fork goroutine create executors, this pre-init prevent race condition for modify RuntimeStatsColl map, so RuntimeStatsColl live without mutex.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to collect the runtime statistics of the table worker of the IndexLookupExecutor, because it is never used and presented to the user in the ExplainExec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, _tableReader can be removed~

for i := 0; i < lookupConcurrencyLimit; i++ {
worker := &tableWorker{
workCh: workCh,
Expand Down Expand Up @@ -512,6 +518,10 @@ func (e *IndexLookUpExecutor) Close() error {

// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for {
resultTask, err := e.getResultTask()
Expand Down
33 changes: 33 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/tidb/ast"
Expand All @@ -38,13 +39,15 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

var (
_ Executor = &baseExecutor{}
_ Executor = &CheckTableExec{}
_ Executor = &HashAggExec{}
_ Executor = &LimitExec{}
Expand Down Expand Up @@ -75,6 +78,7 @@ type baseExecutor struct {
maxChunkSize int
children []Executor
retFieldTypes []*types.FieldType
runtimeStat *execdetails.RuntimeStat
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Stat/Stats/
stats is short for statistics in our codebase.

Copy link
Contributor Author

@lysu lysu Oct 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but runtimeStats is collection of runtimeStat

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about s/RuntimeStat/ExecutorStats/ and keep RuntimeStats a collection of the ExecutorStats?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels stranger 🤣

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then how about s/RuntimeStats/RuntimeStatsColl/ and s/RuntimeStat/RuntimeStats/?

}

// Open initializes children recursively and "childrenResults" according to children's schemas.
Expand Down Expand Up @@ -130,6 +134,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
schema: schema,
initCap: ctx.GetSessionVars().MaxChunkSize,
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
runtimeStat: ctx.GetSessionVars().StmtCtx.RuntimeStats.GetRuntimeStat(id),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.runtimeStat will always be set, no matter whether it is in the explain analyze statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx.GetSessionVars().StmtCtx.RuntimeStats == nil so GetRuntimeStat(id) will quick return nil, so I only +1 a set nil, but look more uniform?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's doesn't +1 memory set, because it's in struct initializer

}
if schema != nil {
cols := schema.Columns
Expand Down Expand Up @@ -172,6 +177,10 @@ type CancelDDLJobsExec struct {

// Next implements the Executor Next interface.
func (e *CancelDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobIDs) {
return nil
Expand Down Expand Up @@ -614,6 +623,10 @@ type LimitExec struct {

// Next implements the Executor Next interface.
func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.cursor >= e.end {
return nil
Expand Down Expand Up @@ -733,6 +746,10 @@ func (e *TableDualExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *TableDualExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.numReturned >= e.numDualRows {
return nil
Expand Down Expand Up @@ -784,6 +801,10 @@ func (e *SelectionExec) Close() error {

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)

if !e.batched {
Expand Down Expand Up @@ -859,6 +880,10 @@ type TableScanExec struct {

// Next implements the Executor Next interface.
func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.isVirtualTable {
return errors.Trace(e.nextChunk4InfoSchema(ctx, chk))
Expand Down Expand Up @@ -959,6 +984,10 @@ func (e *MaxOneRowExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.evaluated {
return nil
Expand Down Expand Up @@ -1101,6 +1130,10 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) {

// Next implements the Executor Next interface.
func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if !e.initialized {
e.initialize(ctx)
Expand Down
5 changes: 5 additions & 0 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"runtime"
"sort"
"sync"
"time"
"unsafe"

"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -189,6 +190,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork

// Next implements the Executor interface.
func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
e.joinResult.Reset()
for {
Expand Down
9 changes: 9 additions & 0 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -508,6 +509,10 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
// step 1. fetch data from inner child and build a hash table;
// step 2. fetch data from outer child in a background goroutine and probe the hash table in multiple join workers.
func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
if !e.prepared {
e.innerFinished = make(chan error, 1)
go util.WithRecovery(func() { e.fetchInnerAndBuildHashTable(ctx) }, e.finishFetchInnerAndBuildHashTable)
Expand Down Expand Up @@ -721,6 +726,10 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {

// Next implements the Executor interface.
func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for {
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
Expand Down
6 changes: 6 additions & 0 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package executor

import (
"time"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -261,6 +263,10 @@ func (e *MergeJoinExec) prepare(ctx context.Context, chk *chunk.Chunk) error {

// Next implements the Executor Next interface.
func (e *MergeJoinExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.prepared {
if err := e.prepare(ctx, chk); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package executor

import (
"time"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -139,6 +141,10 @@ func (e *ProjectionExec) Open(ctx context.Context) error {
// +------------------------------+ +----------------------+
//
func (e *ProjectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.isUnparallelExec() {
return errors.Trace(e.unParallelExecute(ctx, chk))
Expand Down
9 changes: 9 additions & 0 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"container/heap"
"sort"
"time"

"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -73,6 +74,10 @@ func (e *SortExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.fetched {
err := e.fetchRowChunks(ctx)
Expand Down Expand Up @@ -296,6 +301,10 @@ func (e *TopNExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *TopNExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.fetched {
e.totalLimit = int(e.limit.Offset + e.limit.Count)
Expand Down
Loading