Skip to content

Commit

Permalink
Fix up displaying executor runtime info & Add runtime information for…
Browse files Browse the repository at this point in the history
… DML in explain analyze pingcap#18056

Signed-off-by: jianyilyu <jianyilyu@126.com>
  • Loading branch information
jianyilyu committed Aug 10, 2020
1 parent 5e74b33 commit 05cd8db
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 77 deletions.
42 changes: 4 additions & 38 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package executor

import (
"context"
"github.com/pingcap/tidb/store/tikv"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/config"
Expand All @@ -40,9 +39,6 @@ type DeleteExec struct {
// the columns ordinals is present in ordinal range format, @see plannercore.TblColPosInfos
tblColPosInfos plannercore.TblColPosInfoSlice
memTracker *memory.Tracker

stats *pointGetRuntimeStats
snapshot kv.Snapshot
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -85,9 +81,10 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
}
}

// If tidb_batch_delete is ON and not in a transaction, we could use BatchDelete mode.
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn() && config.GetGlobalConfig().EnableBatchDML
batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize
// If tidb_batch_delete is ON and not in a transaction, we could use BatchDelete mode.
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn() &&
config.GetGlobalConfig().EnableBatchDML && batchDMLSize > 0
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
memUsageOfChk := int64(0)
Expand All @@ -105,9 +102,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
e.memTracker.Consume(memUsageOfChk)
for chunkRow := iter.Begin(); chunkRow != iter.End(); chunkRow = iter.Next() {
if batchDelete && rowCount >= batchDMLSize {
if err = e.ctx.StmtCommit(e.memTracker); err != nil {
return err
}
e.ctx.StmtCommit()
if err = e.ctx.NewTxn(ctx); err != nil {
// We should return a special error for batch insert.
return ErrBatchInsertFail.GenWithStack("BatchDelete failed with error: %v", err)
Expand Down Expand Up @@ -218,36 +213,7 @@ func (e *DeleteExec) Close() error {
func (e *DeleteExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}

txnCtx := e.ctx.GetSessionVars().TxnCtx
if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() {
// We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS.
// The snapshot may contains cache that can reduce RPC call.
e.snapshot = txn.GetSnapshot()
} else {
var err error
e.snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: txnCtx.StartTS})
if err != nil {
return err
}
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &pointGetRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
return e.children[0].Open(ctx)
}

Expand Down
14 changes: 10 additions & 4 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ package executor

import (
"context"
"github.com/pingcap/tidb/store/tikv"
"math"

"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
Expand All @@ -27,13 +25,16 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
"math"
)

// InsertValues is the data to insert.
Expand Down Expand Up @@ -74,10 +75,15 @@ type InsertValues struct {
lazyFillAutoID bool
memTracker *memory.Tracker

stats *pointGetRuntimeStats
stats *RuntimeStatsWithRpcStats
snapshot kv.Snapshot
}

type RuntimeStatsWithRpcStats struct {
*execdetails.BasicRuntimeStats
*tikv.SnapshotRuntimeStats
}

type defaultVal struct {
val types.Datum
// valid indicates whether the val is evaluated. We evaluate the default value lazily.
Expand Down Expand Up @@ -488,7 +494,7 @@ func (e *InsertValues) doBatchInsert(ctx context.Context) error {
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &pointGetRuntimeStats{
e.stats = &RuntimeStatsWithRpcStats{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
Expand Down
36 changes: 1 addition & 35 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package executor
import (
"context"
"fmt"

"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand All @@ -47,50 +47,16 @@ type UpdateExec struct {
allAssignmentsAreConstant bool
drained bool
memTracker *memory.Tracker

stats *pointGetRuntimeStats
snapshot kv.Snapshot
}

func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, newData []types.Datum) error {
assignFlag, err := plannercore.GetUpdateColumns(e.ctx, e.OrderedList, schema.Len())
if err != nil {
return err
}
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}

txnCtx := e.ctx.GetSessionVars().TxnCtx
if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() {
// We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS.
// The snapshot may contains cache that can reduce RPC call.
e.snapshot = txn.GetSnapshot()
} else {
var err error
e.snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: txnCtx.StartTS})
if err != nil {
return err
}
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &pointGetRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
if e.updatedRowKeys == nil {
e.updatedRowKeys = make(map[int64]*kv.HandleMap)
}

for _, content := range e.tblColPosInfos {
tbl := e.tblID2table[content.TblID]
if e.updatedRowKeys[content.TblID] == nil {
Expand Down

0 comments on commit 05cd8db

Please sign in to comment.