diff --git a/domain/domain.go b/domain/domain.go index 156e1e84e656f..6e5ba7104ead5 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -60,6 +60,7 @@ type Domain struct { etcdClient *clientv3.Client wg sync.WaitGroup gvc GlobalVariableCache + slowQuery *topNSlowQueries MockReloadFailed MockFailure // It mocks reload failed. } @@ -329,6 +330,32 @@ func (do *Domain) Reload() error { return nil } +// LogTopNSlowQuery keeps topN recent slow queries in domain. +func (do *Domain) LogTopNSlowQuery(query *SlowQueryInfo) { + select { + case do.slowQuery.ch <- query: + default: + } +} + +func (do *Domain) topNSlowQueryLoop() { + defer recoverInDomain("topNSlowQueryLoop", false) + defer do.wg.Done() + ticker := time.NewTicker(time.Minute * 10) + defer ticker.Stop() + for { + select { + case now := <-ticker.C: + do.slowQuery.Refresh(now) + case info, ok := <-do.slowQuery.ch: + if !ok { + return + } + do.slowQuery.Append(info) + } + } +} + func (do *Domain) loadSchemaInLoop(lease time.Duration) { defer do.wg.Done() // Lease renewal can run at any frequency. @@ -408,6 +435,7 @@ func (do *Domain) Close() { if do.etcdClient != nil { terror.Log(errors.Trace(do.etcdClient.Close())) } + do.slowQuery.Close() do.sysSessionPool.Close() do.wg.Wait() log.Info("[domain] close") @@ -471,6 +499,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout), statsLease: statsLease, infoHandle: infoschema.NewHandle(store), + slowQuery: newTopNSlowQueries(30, time.Hour*24*7), } } @@ -529,6 +558,8 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R // Local store needs to get the change information for every DDL state in each session. go do.loadSchemaInLoop(ddlLease) } + do.wg.Add(1) + go do.topNSlowQueryLoop() return nil } diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go new file mode 100644 index 0000000000000..b97a9181663ba --- /dev/null +++ b/domain/topn_slow_query.go @@ -0,0 +1,127 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package domain + +import ( + "container/heap" + "time" + + "github.com/pingcap/tidb/util/execdetails" +) + +type slowQueryHeap struct { + data []*SlowQueryInfo +} + +func (h *slowQueryHeap) Len() int { return len(h.data) } +func (h *slowQueryHeap) Less(i, j int) bool { return h.data[i].Duration < h.data[j].Duration } +func (h *slowQueryHeap) Swap(i, j int) { h.data[i], h.data[j] = h.data[j], h.data[i] } + +func (h *slowQueryHeap) Push(x interface{}) { + h.data = append(h.data, x.(*SlowQueryInfo)) +} + +func (h *slowQueryHeap) Pop() interface{} { + old := h.data + n := len(old) + x := old[n-1] + h.data = old[0 : n-1] + return x +} + +func (h *slowQueryHeap) Refresh(now time.Time, recent time.Duration) { + // Remove outdated slow query element. + idx := 0 + for i := 0; i < len(h.data); i++ { + outdateTime := h.data[i].Start.Add(recent) + if outdateTime.After(now) { + h.data[idx] = h.data[i] + idx++ + } + } + if len(h.data) == idx { + return + } + + // Rebuild the heap. + h.data = h.data[:idx] + heap.Init(h) +} + +// topNSlowQueries maintains two heaps to store recent slow queries: one for user's and one for internal. +// N = 30, recent = 7 days by default. +type topNSlowQueries struct { + user slowQueryHeap + internal slowQueryHeap + topN int + recent time.Duration + ch chan *SlowQueryInfo +} + +func newTopNSlowQueries(topN int, recent time.Duration) *topNSlowQueries { + ret := &topNSlowQueries{ + topN: topN, + recent: recent, + ch: make(chan *SlowQueryInfo, 1000), + } + ret.user.data = make([]*SlowQueryInfo, 0, topN) + ret.internal.data = make([]*SlowQueryInfo, 0, topN) + return ret +} + +func (q *topNSlowQueries) Append(info *SlowQueryInfo) { + var h *slowQueryHeap + if info.Internal { + h = &q.internal + } else { + h = &q.user + } + + // Heap is not full. + if len(h.data) < q.topN { + heap.Push(h, info) + return + } + + // Replace the heap top. + if info.Duration > h.data[0].Duration { + heap.Pop(h) + heap.Push(h, info) + } +} + +func (q *topNSlowQueries) Refresh(now time.Time) { + q.user.Refresh(now, q.recent) + q.internal.Refresh(now, q.recent) +} + +func (q *topNSlowQueries) Close() { + close(q.ch) +} + +// SlowQueryInfo is a struct to record slow query info. +type SlowQueryInfo struct { + SQL string + Start time.Time + Duration time.Duration + Detail execdetails.ExecDetails + Succ bool + ConnID uint64 + TxnTS uint64 + User string + DB string + TableIDs string + IndexIDs string + Internal bool +} diff --git a/domain/topn_slow_query_test.go b/domain/topn_slow_query_test.go new file mode 100644 index 0000000000000..c804e8ba75b17 --- /dev/null +++ b/domain/topn_slow_query_test.go @@ -0,0 +1,110 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package domain + +import ( + "time" + + . "github.com/pingcap/check" +) + +var _ = Suite(&testTopNSlowQuerySuite{}) + +type testTopNSlowQuerySuite struct{} + +func (t *testTopNSlowQuerySuite) TestPush(c *C) { + slowQuery := newTopNSlowQueries(10, 0) + // Insert data into the heap. + slowQuery.Append(&SlowQueryInfo{Duration: 300 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 400 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 500 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 600 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 700 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 800 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 900 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 1000 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 1100 * time.Millisecond}) + slowQuery.Append(&SlowQueryInfo{Duration: 1200 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 300*time.Millisecond) + checkHeap(&slowQuery.user, c) + + // Update all data in the heap. + slowQuery.Append(&SlowQueryInfo{Duration: 1300 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 400*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1400 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 500*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1500 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 600*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1500 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 700*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1600 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 800*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1700 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 900*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1800 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 1000*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 1900 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 1100*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 2000 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 1200*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 2100 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond) + checkHeap(&slowQuery.user, c) + + // Data smaller than heap top will not be inserted. + slowQuery.Append(&SlowQueryInfo{Duration: 1200 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond) + slowQuery.Append(&SlowQueryInfo{Duration: 666 * time.Millisecond}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond) +} + +func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { + now := time.Now() + slowQuery := newTopNSlowQueries(6, 3*time.Second) + + slowQuery.Append(&SlowQueryInfo{Start: now, Duration: 6}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(1 * time.Second), Duration: 5}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(2 * time.Second), Duration: 4}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(3 * time.Second), Duration: 3}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(4 * time.Second), Duration: 2}) + c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond) + + slowQuery.Refresh(now.Add(5 * time.Second)) + c.Assert(len(slowQuery.user.data), Equals, 2) + c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond) + + slowQuery.Append(&SlowQueryInfo{Start: now.Add(3 * time.Second), Duration: 3}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(4 * time.Second), Duration: 2}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(5 * time.Second), Duration: 1}) + slowQuery.Append(&SlowQueryInfo{Start: now.Add(6 * time.Second), Duration: 0}) + c.Assert(len(slowQuery.user.data), Equals, 6) + c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond) + + slowQuery.Refresh(now.Add(6 * time.Second)) + c.Assert(len(slowQuery.user.data), Equals, 4) + c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond) +} + +func checkHeap(q *slowQueryHeap, c *C) { + for i := 0; i < len(q.data); i++ { + left := 2*i + 1 + right := 2*i + 2 + if left < len(q.data) { + c.Assert(q.data[i].Duration, LessEqual, q.data[left].Duration) + } + if right < len(q.data) { + c.Assert(q.data[i].Duration, LessEqual, q.data[right].Duration) + } + } +} diff --git a/executor/adapter.go b/executor/adapter.go index 5f3ace537eba9..642bdb2b86c16 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -22,6 +22,7 @@ import ( "github.com/juju/errors" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -371,6 +372,24 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) { logutil.SlowQueryLogger.Warnf( "[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v", internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql) + var userString string + if user != nil { + userString = user.String() + } + domain.GetDomain(a.Ctx).LogTopNSlowQuery(&domain.SlowQueryInfo{ + SQL: sql, + Start: a.startTime, + Duration: costTime, + Detail: sessVars.StmtCtx.GetExecDetails(), + Succ: succ, + ConnID: connID, + TxnTS: txnTS, + User: userString, + DB: currentDB, + TableIDs: tableIDs, + IndexIDs: indexIDs, + Internal: sessVars.InRestrictedSQL, + }) } }