Skip to content

Commit

Permalink
executor, util: improve concurrency of statement summary (#14490)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 committed Feb 4, 2020
1 parent 80d515c commit 43105cb
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 33 deletions.
18 changes: 16 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/stringutil"
Expand Down Expand Up @@ -808,14 +809,26 @@ func (a *ExecStmt) SummaryStmt() {
planGenerator := func() string {
return plannercore.EncodePlan(a.Plan)
}
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
// Generating plan digest is slow, only generate it once if it's 'Point_Get'.
// If it's a point get, different SQLs leads to different plans, so SQL digest
// is enough to distinguish different plans in this case.
var planDigest string
var planDigestGen func() string
if a.Plan.TP() == plancodec.TypePointGet {
planDigestGen = func() string {
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
return planDigest
}
} else {
_, planDigest = getPlanDigest(a.Ctx, a.Plan)
}

execDetail := stmtCtx.GetExecDetails()
copTaskInfo := stmtCtx.CopTasksDetails()
memMax := stmtCtx.MemTracker.MaxConsumed()
var userString string
if sessVars.User != nil {
userString = sessVars.User.String()
userString = sessVars.User.Username
}

stmtsummary.StmtSummaryByDigestMap.AddStatement(&stmtsummary.StmtExecInfo{
Expand All @@ -827,6 +840,7 @@ func (a *ExecStmt) SummaryStmt() {
PrevSQLDigest: prevSQLDigest,
PlanGenerator: planGenerator,
PlanDigest: planDigest,
PlanDigestGen: planDigestGen,
User: userString,
TotalLatency: costTime,
ParseLatency: sessVars.DurationParse,
Expand Down
3 changes: 2 additions & 1 deletion infoschema/perfschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) {
for i := 1; i < 3; i++ {
tk.MustQuery("select b from p where a=1")
expectedResult := fmt.Sprintf("%d \tPoint_Get_1\troot\t1\ttable:p, handle:1", i)
// Also make sure that the plan digest is not empty
tk.MustQuery(`select exec_count, plan
from performance_schema.events_statements_summary_by_digest
where digest_text like 'select b from p%'`,
where digest_text like 'select b from p%' and plan_digest != ''`,
).Check(testkit.Rows(expectedResult))
}

Expand Down
79 changes: 49 additions & 30 deletions util/stmtsummary/statement_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type stmtSummaryByDigest struct {
// It's rare to read concurrently, so RWMutex is not needed.
// Mutex is only used to lock `history`.
sync.Mutex
initialized bool
// Each element in history is a summary in one interval.
history *list.List
// Following fields are common for each summary element.
Expand Down Expand Up @@ -203,6 +204,7 @@ type StmtExecInfo struct {
PrevSQLDigest string
PlanGenerator func() string
PlanDigest string
PlanDigestGen func() string
User string
TotalLatency time.Duration
ParseLatency time.Duration
Expand Down Expand Up @@ -244,15 +246,17 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) {
prevDigest: sei.PrevSQLDigest,
planDigest: sei.PlanDigest,
}
// Calculate hash value in advance, to reduce the time holding the lock.
key.Hash()

// Enclose the block in a function to ensure the lock will always be released.
value, beginTime, ok := func() (kvcache.Value, int64, bool) {
summary, beginTime := func() (*stmtSummaryByDigest, int64) {
ssMap.Lock()
defer ssMap.Unlock()

// Check again. Statements could be added before disabling the flag and after Clear().
if !ssMap.Enabled() {
return nil, 0, false
return nil, 0
}

if ssMap.beginTimeForCurInterval+intervalSeconds <= now {
Expand All @@ -263,16 +267,20 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) {

beginTime := ssMap.beginTimeForCurInterval
value, ok := ssMap.summaryMap.Get(key)
var summary *stmtSummaryByDigest
if !ok {
newSummary := newStmtSummaryByDigest(sei, beginTime, intervalSeconds, historySize)
ssMap.summaryMap.Put(key, newSummary)
// Lazy initialize it to release ssMap.mutex ASAP.
summary = new(stmtSummaryByDigest)
ssMap.summaryMap.Put(key, summary)
} else {
summary = value.(*stmtSummaryByDigest)
}
return value, beginTime, ok
return summary, beginTime
}()

// Lock a single entry, not the whole cache.
if ok {
value.(*stmtSummaryByDigest).add(sei, beginTime, intervalSeconds, historySize)
if summary != nil {
summary.add(sei, beginTime, intervalSeconds, historySize)
}
}

Expand Down Expand Up @@ -327,20 +335,22 @@ func (ssMap *stmtSummaryByDigestMap) GetMoreThanOnceSelect() ([]string, []string
sqls := make([]string, 0, len(values))
for _, value := range values {
ssbd := value.(*stmtSummaryByDigest)
// `stmtType` won't change once created, so locking is not needed.
if ssbd.stmtType == "select" {
func() {
ssbd.Lock()
if ssbd.history.Len() > 0 {
ssElement := ssbd.history.Back().Value.(*stmtSummaryByDigestElement)
ssElement.Lock()
if ssbd.history.Len() > 1 || ssElement.execCount > 1 {
schemas = append(schemas, ssbd.schemaName)
sqls = append(sqls, ssElement.sampleSQL)
defer ssbd.Unlock()
if ssbd.initialized && ssbd.stmtType == "select" {
if ssbd.history.Len() > 0 {
ssElement := ssbd.history.Back().Value.(*stmtSummaryByDigestElement)
ssElement.Lock()
// Empty sample users means that it is an internal queries.
if ssElement.sampleUser != "" && (ssbd.history.Len() > 1 || ssElement.execCount > 1) {
schemas = append(schemas, ssbd.schemaName)
sqls = append(sqls, ssElement.sampleSQL)
}
ssElement.Unlock()
}
ssElement.Unlock()
}
ssbd.Unlock()
}
}()
}
return schemas, sqls
}
Expand Down Expand Up @@ -487,7 +497,7 @@ func (ssMap *stmtSummaryByDigestMap) historySize() int {
}

// newStmtSummaryByDigest creates a stmtSummaryByDigest from StmtExecInfo.
func newStmtSummaryByDigest(sei *StmtExecInfo, beginTime int64, intervalSeconds int64, historySize int) *stmtSummaryByDigest {
func (ssbd *stmtSummaryByDigest) init(sei *StmtExecInfo, beginTime int64, intervalSeconds int64, historySize int) {
// Use "," to separate table names to support FIND_IN_SET.
var buffer bytes.Buffer
for i, value := range sei.StmtCtx.Tables {
Expand All @@ -500,17 +510,19 @@ func newStmtSummaryByDigest(sei *StmtExecInfo, beginTime int64, intervalSeconds
}
tableNames := buffer.String()

ssbd := &stmtSummaryByDigest{
schemaName: sei.SchemaName,
digest: sei.Digest,
planDigest: sei.PlanDigest,
stmtType: strings.ToLower(sei.StmtCtx.StmtType),
normalizedSQL: formatSQL(sei.NormalizedSQL),
tableNames: tableNames,
history: list.New(),
planDigest := sei.PlanDigest
if sei.PlanDigestGen != nil && len(planDigest) == 0 {
// It comes here only when the plan is 'Point_Get'.
planDigest = sei.PlanDigestGen()
}
ssbd.add(sei, beginTime, intervalSeconds, historySize)
return ssbd
ssbd.schemaName = sei.SchemaName
ssbd.digest = sei.Digest
ssbd.planDigest = planDigest
ssbd.stmtType = strings.ToLower(sei.StmtCtx.StmtType)
ssbd.normalizedSQL = formatSQL(sei.NormalizedSQL)
ssbd.tableNames = tableNames
ssbd.history = list.New()
ssbd.initialized = true
}

func (ssbd *stmtSummaryByDigest) add(sei *StmtExecInfo, beginTime int64, intervalSeconds int64, historySize int) {
Expand All @@ -519,6 +531,10 @@ func (ssbd *stmtSummaryByDigest) add(sei *StmtExecInfo, beginTime int64, interva
ssbd.Lock()
defer ssbd.Unlock()

if !ssbd.initialized {
ssbd.init(sei, beginTime, intervalSeconds, historySize)
}

var ssElement *stmtSummaryByDigestElement
isElementNew := true
if ssbd.history.Len() > 0 {
Expand Down Expand Up @@ -556,7 +572,7 @@ func (ssbd *stmtSummaryByDigest) toCurrentDatum(beginTimeForCurInterval int64) [
var ssElement *stmtSummaryByDigestElement

ssbd.Lock()
if ssbd.history.Len() > 0 {
if ssbd.initialized && ssbd.history.Len() > 0 {
ssElement = ssbd.history.Back().Value.(*stmtSummaryByDigestElement)
}
ssbd.Unlock()
Expand Down Expand Up @@ -585,6 +601,9 @@ func (ssbd *stmtSummaryByDigest) collectHistorySummaries(historySize int) []*stm
ssbd.Lock()
defer ssbd.Unlock()

if !ssbd.initialized {
return nil
}
ssElements := make([]*stmtSummaryByDigestElement, 0, ssbd.history.Len())
for listElement := ssbd.history.Front(); listElement != nil && len(ssElements) < historySize; listElement = listElement.Next() {
ssElement := listElement.Value.(*stmtSummaryByDigestElement)
Expand Down
29 changes: 29 additions & 0 deletions util/stmtsummary/statement_summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func emptyPlanGenerator() string {
return ""
}

func fakePlanDigestGenerator() string {
return "point_get"
}

func (s *testStmtSummarySuite) SetUpSuite(c *C) {
s.ssMap = newStmtSummaryByDigestMap()
s.ssMap.SetEnabled("1", false)
Expand Down Expand Up @@ -998,3 +1002,28 @@ func (s *testStmtSummarySuite) TestEndTime(c *C) {
c.Assert(ssElement.beginTime, LessEqual, now2)
c.Assert(ssElement.endTime-ssElement.beginTime, Equals, int64(60))
}

func (s *testStmtSummarySuite) TestPointGet(c *C) {
s.ssMap.Clear()
now := time.Now().Unix()
s.ssMap.beginTimeForCurInterval = now - 100

stmtExecInfo1 := generateAnyExecInfo()
stmtExecInfo1.PlanDigest = ""
stmtExecInfo1.PlanDigestGen = fakePlanDigestGenerator
s.ssMap.AddStatement(stmtExecInfo1)
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: "",
}
c.Assert(s.ssMap.summaryMap.Size(), Equals, 1)
value, ok := s.ssMap.summaryMap.Get(key)
c.Assert(ok, IsTrue)
ssbd := value.(*stmtSummaryByDigest)
ssElement := ssbd.history.Back().Value.(*stmtSummaryByDigestElement)
c.Assert(ssElement.execCount, Equals, int64(1))

s.ssMap.AddStatement(stmtExecInfo1)
c.Assert(ssElement.execCount, Equals, int64(2))
}

0 comments on commit 43105cb

Please sign in to comment.