Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pingcap/tidb into infoschema
Browse files Browse the repository at this point in the history
  • Loading branch information
Haibin Xie committed Sep 12, 2018
2 parents 7256b06 + a6d3a18 commit 2758da5
Show file tree
Hide file tree
Showing 36 changed files with 1,154 additions and 719 deletions.
5 changes: 4 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,9 @@ func checkJobMaxInterval(job *model.Job) time.Duration {
if job.Type == model.ActionAddIndex {
return 3 * time.Second
}
if job.Type == model.ActionCreateTable || job.Type == model.ActionCreateSchema {
return 500 * time.Millisecond
}
return 1 * time.Second
}

Expand Down Expand Up @@ -484,7 +487,7 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
jobID := job.ID
// For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
// For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
// But we use etcd to speed up, normally it takes less than 1s now, so we use 1s or 3s as the max value.
// But we use etcd to speed up, normally it takes less than 0.5s now, so we use 0.5s or 1s or 3s as the max value.
ticker := time.NewTicker(chooseLeaseTime(10*d.lease, checkJobMaxInterval(job)))
startTime := time.Now()
metrics.JobsGauge.WithLabelValues(job.Type.String()).Inc()
Expand Down
50 changes: 0 additions & 50 deletions docs/ROADMAP.md

This file was deleted.

File renamed without changes.
1 change: 1 addition & 0 deletions docs/design/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ A proposal template: [TEMPLATE.md](./TEMPLATE.md)
- [Proposal: A new aggregate function execution framework](./2018-07-01-refactor-aggregate-framework.md)
- [Proposal: A new storage row format for efficient decoding](./2018-07-19-row-format.md)
- [Proposal: A new command to restore dropped table](./2018-08-10-restore-dropped-table.md)
- [Proposal: Infer the System Timezone of a TiDB cluster via TZ environment variable](./2018-09-10-adding-tz-env.md)

## In Progress

Expand Down
31 changes: 31 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Domain struct {
etcdClient *clientv3.Client
wg sync.WaitGroup
gvc GlobalVariableCache
slowQuery *topNSlowQueries

MockReloadFailed MockFailure // It mocks reload failed.
}
Expand Down Expand Up @@ -329,6 +330,32 @@ func (do *Domain) Reload() error {
return nil
}

// LogTopNSlowQuery keeps topN recent slow queries in domain.
func (do *Domain) LogTopNSlowQuery(query *SlowQueryInfo) {
select {
case do.slowQuery.ch <- query:
default:
}
}

func (do *Domain) topNSlowQueryLoop() {
defer recoverInDomain("topNSlowQueryLoop", false)
defer do.wg.Done()
ticker := time.NewTicker(time.Minute * 10)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
do.slowQuery.Refresh(now)
case info, ok := <-do.slowQuery.ch:
if !ok {
return
}
do.slowQuery.Append(info)
}
}
}

func (do *Domain) loadSchemaInLoop(lease time.Duration) {
defer do.wg.Done()
// Lease renewal can run at any frequency.
Expand Down Expand Up @@ -408,6 +435,7 @@ func (do *Domain) Close() {
if do.etcdClient != nil {
terror.Log(errors.Trace(do.etcdClient.Close()))
}
do.slowQuery.Close()
do.sysSessionPool.Close()
do.wg.Wait()
log.Info("[domain] close")
Expand Down Expand Up @@ -471,6 +499,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout),
statsLease: statsLease,
infoHandle: infoschema.NewHandle(store),
slowQuery: newTopNSlowQueries(30, time.Hour*24*7),
}
}

Expand Down Expand Up @@ -529,6 +558,8 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
// Local store needs to get the change information for every DDL state in each session.
go do.loadSchemaInLoop(ddlLease)
}
do.wg.Add(1)
go do.topNSlowQueryLoop()

return nil
}
Expand Down
127 changes: 127 additions & 0 deletions domain/topn_slow_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package domain

import (
"container/heap"
"time"

"github.com/pingcap/tidb/util/execdetails"
)

type slowQueryHeap struct {
data []*SlowQueryInfo
}

func (h *slowQueryHeap) Len() int { return len(h.data) }
func (h *slowQueryHeap) Less(i, j int) bool { return h.data[i].Duration < h.data[j].Duration }
func (h *slowQueryHeap) Swap(i, j int) { h.data[i], h.data[j] = h.data[j], h.data[i] }

func (h *slowQueryHeap) Push(x interface{}) {
h.data = append(h.data, x.(*SlowQueryInfo))
}

func (h *slowQueryHeap) Pop() interface{} {
old := h.data
n := len(old)
x := old[n-1]
h.data = old[0 : n-1]
return x
}

func (h *slowQueryHeap) Refresh(now time.Time, recent time.Duration) {
// Remove outdated slow query element.
idx := 0
for i := 0; i < len(h.data); i++ {
outdateTime := h.data[i].Start.Add(recent)
if outdateTime.After(now) {
h.data[idx] = h.data[i]
idx++
}
}
if len(h.data) == idx {
return
}

// Rebuild the heap.
h.data = h.data[:idx]
heap.Init(h)
}

// topNSlowQueries maintains two heaps to store recent slow queries: one for user's and one for internal.
// N = 30, recent = 7 days by default.
type topNSlowQueries struct {
user slowQueryHeap
internal slowQueryHeap
topN int
recent time.Duration
ch chan *SlowQueryInfo
}

func newTopNSlowQueries(topN int, recent time.Duration) *topNSlowQueries {
ret := &topNSlowQueries{
topN: topN,
recent: recent,
ch: make(chan *SlowQueryInfo, 1000),
}
ret.user.data = make([]*SlowQueryInfo, 0, topN)
ret.internal.data = make([]*SlowQueryInfo, 0, topN)
return ret
}

func (q *topNSlowQueries) Append(info *SlowQueryInfo) {
var h *slowQueryHeap
if info.Internal {
h = &q.internal
} else {
h = &q.user
}

// Heap is not full.
if len(h.data) < q.topN {
heap.Push(h, info)
return
}

// Replace the heap top.
if info.Duration > h.data[0].Duration {
heap.Pop(h)
heap.Push(h, info)
}
}

func (q *topNSlowQueries) Refresh(now time.Time) {
q.user.Refresh(now, q.recent)
q.internal.Refresh(now, q.recent)
}

func (q *topNSlowQueries) Close() {
close(q.ch)
}

// SlowQueryInfo is a struct to record slow query info.
type SlowQueryInfo struct {
SQL string
Start time.Time
Duration time.Duration
Detail execdetails.ExecDetails
Succ bool
ConnID uint64
TxnTS uint64
User string
DB string
TableIDs string
IndexIDs string
Internal bool
}
110 changes: 110 additions & 0 deletions domain/topn_slow_query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package domain

import (
"time"

. "github.com/pingcap/check"
)

var _ = Suite(&testTopNSlowQuerySuite{})

type testTopNSlowQuerySuite struct{}

func (t *testTopNSlowQuerySuite) TestPush(c *C) {
slowQuery := newTopNSlowQueries(10, 0)
// Insert data into the heap.
slowQuery.Append(&SlowQueryInfo{Duration: 300 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 400 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 500 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 600 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 700 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 800 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 900 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 1000 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 1100 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 1200 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 300*time.Millisecond)
checkHeap(&slowQuery.user, c)

// Update all data in the heap.
slowQuery.Append(&SlowQueryInfo{Duration: 1300 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 400*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1400 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 500*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1500 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 600*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1500 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 700*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1600 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 800*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1700 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 900*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1800 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1000*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 1900 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1100*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 2000 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1200*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 2100 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond)
checkHeap(&slowQuery.user, c)

// Data smaller than heap top will not be inserted.
slowQuery.Append(&SlowQueryInfo{Duration: 1200 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond)
slowQuery.Append(&SlowQueryInfo{Duration: 666 * time.Millisecond})
c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond)
}

func (t *testTopNSlowQuerySuite) TestRefresh(c *C) {
now := time.Now()
slowQuery := newTopNSlowQueries(6, 3*time.Second)

slowQuery.Append(&SlowQueryInfo{Start: now, Duration: 6})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(1 * time.Second), Duration: 5})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(2 * time.Second), Duration: 4})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(3 * time.Second), Duration: 3})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(4 * time.Second), Duration: 2})
c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond)

slowQuery.Refresh(now.Add(5 * time.Second))
c.Assert(len(slowQuery.user.data), Equals, 2)
c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond)

slowQuery.Append(&SlowQueryInfo{Start: now.Add(3 * time.Second), Duration: 3})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(4 * time.Second), Duration: 2})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(5 * time.Second), Duration: 1})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(6 * time.Second), Duration: 0})
c.Assert(len(slowQuery.user.data), Equals, 6)
c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond)

slowQuery.Refresh(now.Add(6 * time.Second))
c.Assert(len(slowQuery.user.data), Equals, 4)
c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond)
}

func checkHeap(q *slowQueryHeap, c *C) {
for i := 0; i < len(q.data); i++ {
left := 2*i + 1
right := 2*i + 2
if left < len(q.data) {
c.Assert(q.data[i].Duration, LessEqual, q.data[left].Duration)
}
if right < len(q.data) {
c.Assert(q.data[i].Duration, LessEqual, q.data[right].Duration)
}
}
}
Loading

0 comments on commit 2758da5

Please sign in to comment.