Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, util: improve concurrency of statement summary #14490

Merged
merged 9 commits into from
Feb 4, 2020
18 changes: 16 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/stringutil"
Expand Down Expand Up @@ -892,14 +893,26 @@ func (a *ExecStmt) SummaryStmt() {
planGenerator := func() string {
return plannercore.EncodePlan(a.Plan)
}
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
// Generating plan digest is slow, only generate it once if it's 'Point_Get'.
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
// If it's a point get, different SQLs leads to different plans, so SQL digest
// is enough to distinguish different plans in this case.
var planDigest string
var planDigestGen func() string
if a.Plan.TP() == plancodec.TypePointGet {
planDigestGen = func() string {
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
return planDigest
}
} else {
_, planDigest = getPlanDigest(a.Ctx, a.Plan)
}

execDetail := stmtCtx.GetExecDetails()
copTaskInfo := stmtCtx.CopTasksDetails()
memMax := stmtCtx.MemTracker.MaxConsumed()
var userString string
if sessVars.User != nil {
userString = sessVars.User.String()
userString = sessVars.User.Username
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attention, User.Username doesn't contain the User.Hostname, are you sure about this change?

If User.String() is slow, how bout optimize the funtion, we can use string + string... instead of fmt.Sprintf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether host name helps when analyzing performance. According to your experience, is it helpful?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rarely use use.host info.

}

stmtsummary.StmtSummaryByDigestMap.AddStatement(&stmtsummary.StmtExecInfo{
Expand All @@ -911,6 +924,7 @@ func (a *ExecStmt) SummaryStmt() {
PrevSQLDigest: prevSQLDigest,
PlanGenerator: planGenerator,
PlanDigest: planDigest,
PlanDigestGen: planDigestGen,
User: userString,
TotalLatency: costTime,
ParseLatency: sessVars.DurationParse,
Expand Down
3 changes: 2 additions & 1 deletion infoschema/perfschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) {
for i := 1; i < 3; i++ {
tk.MustQuery("select b from p where a=1")
expectedResult := fmt.Sprintf("%d \tPoint_Get_1\troot\t1\ttable:p, handle:1", i)
// Also make sure that the plan digest is not empty
tk.MustQuery(`select exec_count, plan
from performance_schema.events_statements_summary_by_digest
where digest_text like 'select b from p%'`,
where digest_text like 'select b from p%' and plan_digest != ''`,
).Check(testkit.Rows(expectedResult))
}

Expand Down
80 changes: 49 additions & 31 deletions util/stmtsummary/statement_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type stmtSummaryByDigest struct {
// It's rare to read concurrently, so RWMutex is not needed.
// Mutex is only used to lock `history`.
sync.Mutex
initialized bool
// Each element in history is a summary in one interval.
history *list.List
// Following fields are common for each summary element.
Expand Down Expand Up @@ -202,6 +203,7 @@ type StmtExecInfo struct {
PrevSQLDigest string
PlanGenerator func() string
PlanDigest string
PlanDigestGen func() string
User string
TotalLatency time.Duration
ParseLatency time.Duration
Expand Down Expand Up @@ -243,15 +245,17 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) {
prevDigest: sei.PrevSQLDigest,
planDigest: sei.PlanDigest,
}
// Calculate hash value in advance, to reduce the time holding the lock.
key.Hash()
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved

// Enclose the block in a function to ensure the lock will always be released.
value, beginTime, ok := func() (kvcache.Value, int64, bool) {
summary, beginTime := func() (*stmtSummaryByDigest, int64) {
ssMap.Lock()
defer ssMap.Unlock()

// Check again. Statements could be added before disabling the flag and after Clear().
if !ssMap.Enabled() {
return nil, 0, false
return nil, 0
}

if ssMap.beginTimeForCurInterval+intervalSeconds <= now {
Expand All @@ -262,16 +266,20 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) {

beginTime := ssMap.beginTimeForCurInterval
value, ok := ssMap.summaryMap.Get(key)
var summary *stmtSummaryByDigest
if !ok {
newSummary := newStmtSummaryByDigest(sei, beginTime, intervalSeconds, historySize)
ssMap.summaryMap.Put(key, newSummary)
// Lazy initialize it to release ssMap.mutex ASAP.
summary = new(stmtSummaryByDigest)
ssMap.summaryMap.Put(key, summary)
} else {
summary = value.(*stmtSummaryByDigest)
}
return value, beginTime, ok
return summary, beginTime
}()

// Lock a single entry, not the whole cache.
if ok {
value.(*stmtSummaryByDigest).add(sei, beginTime, intervalSeconds, historySize)
if summary != nil {
summary.add(sei, beginTime, intervalSeconds, historySize)
}
}

Expand Down Expand Up @@ -326,21 +334,22 @@ func (ssMap *stmtSummaryByDigestMap) GetMoreThanOnceSelect() ([]string, []string
sqls := make([]string, 0, len(values))
for _, value := range values {
ssbd := value.(*stmtSummaryByDigest)
// `stmtType` won't change once created, so locking is not needed.
if ssbd.stmtType == "select" {
func() {
ssbd.Lock()
if ssbd.history.Len() > 0 {
ssElement := ssbd.history.Back().Value.(*stmtSummaryByDigestElement)
ssElement.Lock()
// Empty sample users means that it is an internal queries.
if ssElement.sampleUser != "" && (ssbd.history.Len() > 1 || ssElement.execCount > 1) {
schemas = append(schemas, ssbd.schemaName)
sqls = append(sqls, ssElement.sampleSQL)
defer ssbd.Unlock()
if ssbd.initialized && ssbd.stmtType == "select" {
if ssbd.history.Len() > 0 {
ssElement := ssbd.history.Back().Value.(*stmtSummaryByDigestElement)
ssElement.Lock()
// Empty sample users means that it is an internal queries.
if ssElement.sampleUser != "" && (ssbd.history.Len() > 1 || ssElement.execCount > 1) {
schemas = append(schemas, ssbd.schemaName)
sqls = append(sqls, ssElement.sampleSQL)
}
ssElement.Unlock()
}
ssElement.Unlock()
}
ssbd.Unlock()
}
}()
}
return schemas, sqls
}
Expand Down Expand Up @@ -487,7 +496,7 @@ func (ssMap *stmtSummaryByDigestMap) historySize() int {
}

// newStmtSummaryByDigest creates a stmtSummaryByDigest from StmtExecInfo.
func newStmtSummaryByDigest(sei *StmtExecInfo, beginTime int64, intervalSeconds int64, historySize int) *stmtSummaryByDigest {
func (ssbd *stmtSummaryByDigest) init(sei *StmtExecInfo, beginTime int64, intervalSeconds int64, historySize int) {
// Use "," to separate table names to support FIND_IN_SET.
var buffer bytes.Buffer
for i, value := range sei.StmtCtx.Tables {
Expand All @@ -500,17 +509,19 @@ func newStmtSummaryByDigest(sei *StmtExecInfo, beginTime int64, intervalSeconds
}
tableNames := buffer.String()

ssbd := &stmtSummaryByDigest{
schemaName: sei.SchemaName,
digest: sei.Digest,
planDigest: sei.PlanDigest,
stmtType: strings.ToLower(sei.StmtCtx.StmtType),
normalizedSQL: formatSQL(sei.NormalizedSQL),
tableNames: tableNames,
history: list.New(),
planDigest := sei.PlanDigest
if sei.PlanDigestGen != nil && len(planDigest) == 0 {
// It comes here only when the plan is 'Point_Get'.
planDigest = sei.PlanDigestGen()
}
ssbd.add(sei, beginTime, intervalSeconds, historySize)
return ssbd
ssbd.schemaName = sei.SchemaName
ssbd.digest = sei.Digest
ssbd.planDigest = planDigest
ssbd.stmtType = strings.ToLower(sei.StmtCtx.StmtType)
ssbd.normalizedSQL = formatSQL(sei.NormalizedSQL)
ssbd.tableNames = tableNames
ssbd.history = list.New()
ssbd.initialized = true
}

func (ssbd *stmtSummaryByDigest) add(sei *StmtExecInfo, beginTime int64, intervalSeconds int64, historySize int) {
Expand All @@ -519,6 +530,10 @@ func (ssbd *stmtSummaryByDigest) add(sei *StmtExecInfo, beginTime int64, interva
ssbd.Lock()
defer ssbd.Unlock()

if !ssbd.initialized {
ssbd.init(sei, beginTime, intervalSeconds, historySize)
}

var ssElement *stmtSummaryByDigestElement
isElementNew := true
if ssbd.history.Len() > 0 {
Expand Down Expand Up @@ -556,7 +571,7 @@ func (ssbd *stmtSummaryByDigest) toCurrentDatum(beginTimeForCurInterval int64) [
var ssElement *stmtSummaryByDigestElement

ssbd.Lock()
if ssbd.history.Len() > 0 {
if ssbd.initialized && ssbd.history.Len() > 0 {
ssElement = ssbd.history.Back().Value.(*stmtSummaryByDigestElement)
}
ssbd.Unlock()
Expand Down Expand Up @@ -585,6 +600,9 @@ func (ssbd *stmtSummaryByDigest) collectHistorySummaries(historySize int) []*stm
ssbd.Lock()
defer ssbd.Unlock()

if !ssbd.initialized {
return nil
}
ssElements := make([]*stmtSummaryByDigestElement, 0, ssbd.history.Len())
for listElement := ssbd.history.Front(); listElement != nil && len(ssElements) < historySize; listElement = listElement.Next() {
ssElement := listElement.Value.(*stmtSummaryByDigestElement)
Expand Down
29 changes: 29 additions & 0 deletions util/stmtsummary/statement_summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func emptyPlanGenerator() string {
return ""
}

func fakePlanDigestGenerator() string {
return "point_get"
}

func (s *testStmtSummarySuite) SetUpSuite(c *C) {
s.ssMap = newStmtSummaryByDigestMap()
s.ssMap.SetEnabled("1", false)
Expand Down Expand Up @@ -998,3 +1002,28 @@ func (s *testStmtSummarySuite) TestEndTime(c *C) {
c.Assert(ssElement.beginTime, LessEqual, now2)
c.Assert(ssElement.endTime-ssElement.beginTime, Equals, int64(60))
}

func (s *testStmtSummarySuite) TestPointGet(c *C) {
s.ssMap.Clear()
now := time.Now().Unix()
s.ssMap.beginTimeForCurInterval = now - 100

stmtExecInfo1 := generateAnyExecInfo()
stmtExecInfo1.PlanDigest = ""
stmtExecInfo1.PlanDigestGen = fakePlanDigestGenerator
s.ssMap.AddStatement(stmtExecInfo1)
key := &stmtSummaryByDigestKey{
schemaName: stmtExecInfo1.SchemaName,
digest: stmtExecInfo1.Digest,
planDigest: "",
}
c.Assert(s.ssMap.summaryMap.Size(), Equals, 1)
value, ok := s.ssMap.summaryMap.Get(key)
c.Assert(ok, IsTrue)
ssbd := value.(*stmtSummaryByDigest)
ssElement := ssbd.history.Back().Value.(*stmtSummaryByDigestElement)
c.Assert(ssElement.execCount, Equals, int64(1))

s.ssMap.AddStatement(stmtExecInfo1)
c.Assert(ssElement.execCount, Equals, int64(2))
}