Skip to content

Commit

Permalink
domain,executor: store topN slow query in domain (#7646)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Sep 12, 2018
1 parent b58a977 commit 6604e33
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 0 deletions.
31 changes: 31 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Domain struct {
etcdClient *clientv3.Client
wg sync.WaitGroup
gvc GlobalVariableCache
slowQuery *topNSlowQueries

MockReloadFailed MockFailure // It mocks reload failed.
}
Expand Down Expand Up @@ -329,6 +330,32 @@ func (do *Domain) Reload() error {
return nil
}

// LogTopNSlowQuery keeps topN recent slow queries in domain.
func (do *Domain) LogTopNSlowQuery(query *SlowQueryInfo) {
select {
case do.slowQuery.ch <- query:
default:
}
}

func (do *Domain) topNSlowQueryLoop() {
defer recoverInDomain("topNSlowQueryLoop", false)
defer do.wg.Done()
ticker := time.NewTicker(time.Minute * 10)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
do.slowQuery.Refresh(now)
case info, ok := <-do.slowQuery.ch:
if !ok {
return
}
do.slowQuery.Append(info)
}
}
}

func (do *Domain) loadSchemaInLoop(lease time.Duration) {
defer do.wg.Done()
// Lease renewal can run at any frequency.
Expand Down Expand Up @@ -408,6 +435,7 @@ func (do *Domain) Close() {
if do.etcdClient != nil {
terror.Log(errors.Trace(do.etcdClient.Close()))
}
do.slowQuery.Close()
do.sysSessionPool.Close()
do.wg.Wait()
log.Info("[domain] close")
Expand Down Expand Up @@ -471,6 +499,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout),
statsLease: statsLease,
infoHandle: infoschema.NewHandle(store),
slowQuery: newTopNSlowQueries(30, time.Hour*24*7),
}
}

Expand Down Expand Up @@ -529,6 +558,8 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
// Local store needs to get the change information for every DDL state in each session.
go do.loadSchemaInLoop(ddlLease)
}
do.wg.Add(1)
go do.topNSlowQueryLoop()

return nil
}
Expand Down
127 changes: 127 additions & 0 deletions domain/topn_slow_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package domain

import (
"container/heap"
"time"

"github.com/pingcap/tidb/util/execdetails"
)

type slowQueryHeap struct {
data []*SlowQueryInfo
}

func (h *slowQueryHeap) Len() int { return len(h.data) }
func (h *slowQueryHeap) Less(i, j int) bool { return h.data[i].Duration < h.data[j].Duration }
func (h *slowQueryHeap) Swap(i, j int) { h.data[i], h.data[j] = h.data[j], h.data[i] }

func (h *slowQueryHeap) Push(x interface{}) {
h.data = append(h.data, x.(*SlowQueryInfo))
}

func (h *slowQueryHeap) Pop() interface{} {
old := h.data
n := len(old)
x := old[n-1]
h.data = old[0 : n-1]
return x
}

func (h *slowQueryHeap) Refresh(now time.Time, recent time.Duration) {
// Remove outdated slow query element.
idx := 0
for i := 0; i < len(h.data); i++ {
outdateTime := h.data[i].Start.Add(recent)
if outdateTime.After(now) {
h.data[idx] = h.data[i]
idx++
}
}
if len(h.data) == idx {
return
}

// Rebuild the heap.
h.data = h.data[:idx]
heap.Init(h)
}

// topNSlowQueries maintains two heaps to store recent slow queries: one for user's and one for internal.
// N = 30, recent = 7 days by default.
type topNSlowQueries struct {
user slowQueryHeap
internal slowQueryHeap
topN int
recent time.Duration
ch chan *SlowQueryInfo
}

func newTopNSlowQueries(topN int, recent time.Duration) *topNSlowQueries {
ret := &topNSlowQueries{
topN: topN,
recent: recent,
ch: make(chan *SlowQueryInfo, 1000),
}
ret.user.data = make([]*SlowQueryInfo, 0, topN)
ret.internal.data = make([]*SlowQueryInfo, 0, topN)
return ret
}

func (q *topNSlowQueries) Append(info *SlowQueryInfo) {
var h *slowQueryHeap
if info.Internal {
h = &q.internal
} else {
h = &q.user
}

// Heap is not full.
if len(h.data) < q.topN {
heap.Push(h, info)
return
}

// Replace the heap top.
if info.Duration > h.data[0].Duration {
heap.Pop(h)
heap.Push(h, info)
}
}

func (q *topNSlowQueries) Refresh(now time.Time) {
q.user.Refresh(now, q.recent)
q.internal.Refresh(now, q.recent)
}

func (q *topNSlowQueries) Close() {
close(q.ch)
}

// SlowQueryInfo is a struct to record slow query info.
type SlowQueryInfo struct {
SQL string
Start time.Time
Duration time.Duration
Detail execdetails.ExecDetails
Succ bool
ConnID uint64
TxnTS uint64
User string
DB string
TableIDs string
IndexIDs string
Internal bool
}
110 changes: 110 additions & 0 deletions domain/topn_slow_query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package domain

import (
"time"

. "github.com/pingcap/check"
)

var _ = Suite(&testTopNSlowQuerySuite{})

type testTopNSlowQuerySuite struct{}

func (t *testTopNSlowQuerySuite) TestPush(c *C) {
slowQuery := newTopNSlowQueries(10, 0)
// Insert data into the heap.
slowQuery.Append(&SlowQueryInfo{Duration: 300 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 400 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 500 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 600 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 700 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 800 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 900 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 1000 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 1100 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 1200 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 300*time.Millisecond)
checkHeap(&slowQuery.user, c)

// Update all data in the heap.
slowQuery.Append(&SlowQueryInfo{Duration: 1300 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 400*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1400 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 500*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1500 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 600*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1500 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 700*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1600 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 800*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1700 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 900*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1800 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1000*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1900 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1100*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 2000 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1200*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 2100 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond)
checkHeap(&slowQuery.user, c)

// Data smaller than heap top will not be inserted.
slowQuery.Append(&SlowQueryInfo{Duration: 1200 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 666 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond)
}

func (t *testTopNSlowQuerySuite) TestRefresh(c *C) {
now := time.Now()
slowQuery := newTopNSlowQueries(6, 3*time.Second)

slowQuery.Append(&SlowQueryInfo{Start: now, Duration: 6})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(1 * time.Second), Duration: 5})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(2 * time.Second), Duration: 4})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(3 * time.Second), Duration: 3})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(4 * time.Second), Duration: 2})
c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond)

slowQuery.Refresh(now.Add(5 * time.Second))
c.Assert(len(slowQuery.user.data), Equals, 2)
c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond)

slowQuery.Append(&SlowQueryInfo{Start: now.Add(3 * time.Second), Duration: 3})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(4 * time.Second), Duration: 2})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(5 * time.Second), Duration: 1})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(6 * time.Second), Duration: 0})
c.Assert(len(slowQuery.user.data), Equals, 6)
c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond)

slowQuery.Refresh(now.Add(6 * time.Second))
c.Assert(len(slowQuery.user.data), Equals, 4)
c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond)
}

func checkHeap(q *slowQueryHeap, c *C) {
for i := 0; i < len(q.data); i++ {
left := 2*i + 1
right := 2*i + 2
if left < len(q.data) {
c.Assert(q.data[i].Duration, LessEqual, q.data[left].Duration)
}
if right < len(q.data) {
c.Assert(q.data[i].Duration, LessEqual, q.data[right].Duration)
}
}
}
19 changes: 19 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -371,6 +372,24 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) {
logutil.SlowQueryLogger.Warnf(
"[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
var userString string
if user != nil {
userString = user.String()
}
domain.GetDomain(a.Ctx).LogTopNSlowQuery(&domain.SlowQueryInfo{
SQL: sql,
Start: a.startTime,
Duration: costTime,
Detail: sessVars.StmtCtx.GetExecDetails(),
Succ: succ,
ConnID: connID,
TxnTS: txnTS,
User: userString,
DB: currentDB,
TableIDs: tableIDs,
IndexIDs: indexIDs,
Internal: sessVars.InRestrictedSQL,
})
}
}

Expand Down

0 comments on commit 6604e33

Please sign in to comment.