diff --git a/go.mod b/go.mod index 6a60bdbeb67df..96c17c8d37f8b 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e - github.com/pingcap/tipb v0.0.0-20211201080053-bd104bb270ba + github.com/pingcap/tipb v0.0.0-20211227115224-a06a85f9d2a5 github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 @@ -79,12 +79,12 @@ require ( go.uber.org/goleak v1.1.12 go.uber.org/multierr v1.7.0 go.uber.org/zap v1.19.1 - golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420 + golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e + golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e golang.org/x/text v0.3.7 - golang.org/x/tools v0.1.5 + golang.org/x/tools v0.1.8 google.golang.org/api v0.54.0 google.golang.org/grpc v1.40.0 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect diff --git a/go.sum b/go.sum index 087c367fd7482..608194796decd 100644 --- a/go.sum +++ b/go.sum @@ -598,8 +598,8 @@ github.com/pingcap/tidb-dashboard v0.0.0-20211008050453-a25c25809529/go.mod h1:O github.com/pingcap/tidb-dashboard v0.0.0-20211107164327-80363dfbe884/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible h1:c7+izmker91NkjkZ6FgTlmD4k1A5FLOAq+li6Ki2/GY= github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20211201080053-bd104bb270ba h1:Tt5W/maVBUbG+wxg2nfc88Cqj/HiWYb0TJQ2Rfi0UOQ= -github.com/pingcap/tipb v0.0.0-20211201080053-bd104bb270ba/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tipb v0.0.0-20211227115224-a06a85f9d2a5 h1:oKL5kSHoQAI9o6mdbxU0cB7fvqm9KyXZs9s/p0kZRnY= +github.com/pingcap/tipb v0.0.0-20211227115224-a06a85f9d2a5/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -776,6 +776,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= @@ -883,6 +884,7 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -928,8 +930,9 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420 h1:a8jGStKg0XqKDlKqjLrXn0ioF5MH36pT7Z0BRTqLhbk= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1022,8 +1025,10 @@ golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e h1:XMgFehsDnnLGtjvjOfqWSUzt0alpTR1RSEuznObga2c= golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1111,8 +1116,9 @@ golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.8 h1:P1HhGGuLW4aAclzjtmJdf0mJOjVUZUzOTqkAkWL+l6w= +golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/util/topsql/reporter/datamodel.go b/util/topsql/reporter/datamodel.go new file mode 100644 index 0000000000000..38d8f627d27ca --- /dev/null +++ b/util/topsql/reporter/datamodel.go @@ -0,0 +1,695 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "bytes" + "sort" + "sync" + "sync/atomic" + + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/topsql/stmtstats" + "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/pingcap/tipb/go-tipb" + "github.com/wangjohn/quickselect" + atomic2 "go.uber.org/atomic" + "go.uber.org/zap" +) + +// Data naming and relationship in datamodel.go: +// +// tsItem: +// - timestamp +// - cpuTime +// - stmtStats(execCount, durationSum, ...) +// +// tsItems: [ tsItem | tsItem | tsItem | ... ] +// +// record: +// - tsItems: [ tsItem | tsItem | tsItem | ... ] +// - tsIndex: { 1640500000 => 0 | 1640500001 => 1 | 1640500002 => 2 | ... } +// +// records: [ record | record | record | ... ] +// +// collecting: +// - records: { sqlPlanDigest => record | sqlPlanDigest => record | ... } +// - evicted: { sqlPlanDigest | sqlPlanDigest | ... } +// +// cpuRecords: [ tracecpu.SQLCPUTimeRecord | tracecpu.SQLCPUTimeRecord | ... ] +// +// normalizeSQLMap: { sqlDigest => normalizedSQL | sqlDigest => normalizedSQL | ... } +// +// normalizePlanMap: { planDigest => normalizedPlan | planDigest => normalizedPlan | ... } + +const ( + // keyOthers is the key to store the aggregation of all records that is out of Top N. + keyOthers = "" + + // maxTsItemsCapacity is a protection to avoid excessive memory usage caused by + // incorrect configuration. The corresponding value defaults to 60 (60 s/min). + maxTsItemsCapacity = 1000 +) + +// tsItem is a self-contained complete piece of data for a certain timestamp. +type tsItem struct { + timestamp uint64 + cpuTimeMs uint32 + stmtStats stmtstats.StatementStatsItem +} + +func zeroTsItem() tsItem { + return tsItem{ + stmtStats: stmtstats.StatementStatsItem{ + KvStatsItem: stmtstats.KvStatementStatsItem{ + KvExecCount: map[string]uint64{}, + }, + }, + } +} + +// toProto converts the tsItem to the corresponding protobuf representation. +func (i *tsItem) toProto() *tipb.TopSQLRecordItem { + return &tipb.TopSQLRecordItem{ + TimestampSec: i.timestamp, + CpuTimeMs: i.cpuTimeMs, + StmtExecCount: i.stmtStats.ExecCount, + StmtKvExecCount: i.stmtStats.KvStatsItem.KvExecCount, + // TODO: add duration + // StmtDurationSumNs: i.stmtStats.DurationSumNs, + // Convert more indicators here. + } +} + +var _ sort.Interface = &tsItems{} + +// tsItems is a sortable list of tsItem, sort by tsItem.timestamp (asc). +type tsItems []tsItem + +func (ts tsItems) Len() int { + return len(ts) +} + +func (ts tsItems) Less(i, j int) bool { + return ts[i].timestamp < ts[j].timestamp +} + +func (ts tsItems) Swap(i, j int) { + ts[i], ts[j] = ts[j], ts[i] +} + +func (ts tsItems) sorted() bool { + for n := 0; n < len(ts)-1; n++ { + if ts[n].timestamp > ts[n+1].timestamp { + return false + } + } + return true +} + +// toProto converts the tsItems to the corresponding protobuf representation. +func (ts tsItems) toProto() []*tipb.TopSQLRecordItem { + capacity := len(ts) + if capacity == 0 { + return nil + } + items := make([]*tipb.TopSQLRecordItem, 0, capacity) + for _, i := range ts { + items = append(items, i.toProto()) + } + return items +} + +var _ sort.Interface = &record{} + +// record represents the cumulative tsItem in current minute window. +// record do not guarantee the tsItems is sorted by timestamp when there is a time jump backward. +// record is also sortable, and the tsIndex will be updated while sorting the internal tsItems. +type record struct { + sqlDigest []byte + planDigest []byte + totalCPUTimeMs uint64 + tsItems tsItems + + // tsIndex is used to quickly find the corresponding tsItems index through timestamp. + tsIndex map[uint64]int // timestamp => index of tsItems +} + +func newRecord(sqlDigest, planDigest []byte) *record { + listCap := variable.TopSQLVariable.ReportIntervalSeconds.Load()/variable.TopSQLVariable.PrecisionSeconds.Load() + 1 + if listCap > maxTsItemsCapacity { + listCap = maxTsItemsCapacity + } + return &record{ + sqlDigest: sqlDigest, + planDigest: planDigest, + tsItems: make(tsItems, 0, listCap), + tsIndex: make(map[uint64]int, listCap), + } +} + +func (r *record) Len() int { + return r.tsItems.Len() +} + +func (r *record) Less(i, j int) bool { + return r.tsItems.Less(i, j) +} + +func (r *record) Swap(i, j int) { + // before swap: + // timestamps: [10000, 10001, 10002] + // tsIndex: [10000 => 0, 10001 => 1, 10002 => 2] + // + // let i = 0, j = 1 + // after swap tsIndex: + // timestamps: [10000, 10001, 10002] + // tsIndex: [10000 => 1, 10001 => 0, 10002 => 2] + // + // after swap tsItems: + // timestamps: [10001, 10000, 10002] + // tsIndex: [10000 => 1, 10001 => 0, 10002 => 2] + r.tsIndex[r.tsItems[i].timestamp], r.tsIndex[r.tsItems[j].timestamp] = r.tsIndex[r.tsItems[j].timestamp], r.tsIndex[r.tsItems[i].timestamp] + r.tsItems.Swap(i, j) +} + +// appendCPUTime appends a cpuTime under a certain timestamp to record. +// If the timestamp already exists in tsItems, then cpuTime will be added. +func (r *record) appendCPUTime(timestamp uint64, cpuTimeMs uint32) { + if index, ok := r.tsIndex[timestamp]; ok { + // For the same timestamp, we have already called appendStmtStatsItem, + // r.tsItems already exists the corresponding timestamp, and the corresponding + // cpuTimeMs has been set to 0 (or other values, although impossible), so we add it. + // + // let timestamp = 10000, cpuTimeMs = 123 + // + // Before: + // tsIndex: [10000 => 0] + // tsItems: + // timestamp: [10000] + // cpuTimeMs: [0] + // stmtStats.ExecCount: [?] + // stmtStats.KvExecCount: [map{"?": ?}] + // stmtStats.DurationSum: [?] + // + // After: + // tsIndex: [10000 => 0] + // tsItems: + // timestamp: [10000] + // cpuTimeMs: [123] + // stmtStats.ExecCount: [?] + // stmtStats.KvExecCount: [map{"?": ?}] + // stmtStats.DurationSum: [?] + // + r.tsItems[index].cpuTimeMs += cpuTimeMs + } else { + // For this timestamp, we have not appended any tsItem, so append it directly. + // Other fields in tsItem except cpuTimeMs will be initialized to 0. + // + // let timestamp = 10000, cpu_time = 123 + // + // Before: + // tsIndex: [] + // tsItems: + // timestamp: [] + // cpuTimeMs: [] + // stmtStats.ExecCount: [] + // stmtStats.KvExecCount: [] + // stmtStats.DurationSum: [] + // + // After: + // tsIndex: [10000 => 0] + // tsItems: + // timestamp: [10000] + // cpuTimeMs: [123] + // stmtStats.ExecCount: [0] + // stmtStats.KvExecCount: [map{}] + // stmtStats.DurationSum: [0] + // + newItem := zeroTsItem() + newItem.timestamp = timestamp + newItem.cpuTimeMs = cpuTimeMs + r.tsIndex[timestamp] = len(r.tsItems) + r.tsItems = append(r.tsItems, newItem) + } + r.totalCPUTimeMs += uint64(cpuTimeMs) +} + +// appendStmtStatsItem appends a stmtstats.StatementStatsItem under a certain timestamp to record. +// If the timestamp already exists in tsItems, then stmtstats.StatementStatsItem will be merged. +func (r *record) appendStmtStatsItem(timestamp uint64, item stmtstats.StatementStatsItem) { + if index, ok := r.tsIndex[timestamp]; ok { + // For the same timestamp, we have already called appendCPUTime, + // r.tsItems already exists the corresponding timestamp, and the + // corresponding stmtStats has been set to 0 (or other values, + // although impossible), so we merge it. + // + // let timestamp = 10000, execCount = 123, kvExecCount = map{"1.1.1.1:1": 123}, durationSum = 456 + // + // Before: + // tsIndex: [10000 => 0] + // tsItems: + // timestamp: [10000] + // cpuTimeMs: [?] + // stmtStats.ExecCount: [0] + // stmtStats.KvExecCount: [map{}] + // stmtStats.DurationSum: [0] + // + // After: + // tsIndex: [10000 => 0] + // tsItems: + // timestamp: [10000] + // cpuTimeMs: [?] + // stmtStats.ExecCount: [123] + // stmtStats.KvExecCount: [map{"1.1.1.1:1": 123}] + // stmtStats.DurationSum: [456] + // + r.tsItems[index].stmtStats.Merge(&item) + } else { + // For this timestamp, we have not appended any tsItem, so append it directly. + // Other fields in tsItem except stmtStats will be initialized to 0. + // + // let timestamp = 10000, execCount = 123, kvExecCount = map{"1.1.1.1:1": 123}, durationSum = 456 + // + // Before: + // tsIndex: [] + // tsItems: + // timestamp: [] + // cpuTimeMs: [] + // stmtStats.ExecCount: [] + // stmtStats.KvExecCount: [] + // stmtStats.DurationSum: [] + // + // After: + // tsIndex: [10000 => 0] + // tsItems: + // timestamp: [10000] + // cpuTimeMs: [0] + // stmtStats.ExecCount: [123] + // stmtStats.KvExecCount: [map{"1.1.1.1:1": 123}] + // stmtStats.DurationSum: [456] + // + newItem := zeroTsItem() + newItem.timestamp = timestamp + newItem.stmtStats = item + r.tsIndex[timestamp] = len(r.tsItems) + r.tsItems = append(r.tsItems, newItem) + } +} + +// merge other record into r. +// Attention, this function depend on r is sorted, and will sort `other` by timestamp. +func (r *record) merge(other *record) { + if other == nil || len(other.tsItems) == 0 { + return + } + + if !other.tsItems.sorted() { + sort.Sort(other) // this may never happen + } + if len(r.tsItems) == 0 { + r.totalCPUTimeMs = other.totalCPUTimeMs + r.tsItems = other.tsItems + r.tsIndex = other.tsIndex + return + } + length := len(r.tsItems) + len(other.tsItems) + newTsItems := make(tsItems, 0, length) + i, j := 0, 0 + for i < len(r.tsItems) && j < len(other.tsItems) { + if r.tsItems[i].timestamp == other.tsItems[j].timestamp { + newItem := zeroTsItem() + newItem.timestamp = r.tsItems[i].timestamp + newItem.cpuTimeMs = r.tsItems[i].cpuTimeMs + other.tsItems[j].cpuTimeMs + r.tsItems[i].stmtStats.Merge(&other.tsItems[j].stmtStats) + newItem.stmtStats = r.tsItems[i].stmtStats + newTsItems = append(newTsItems, newItem) + i++ + j++ + } else if r.tsItems[i].timestamp < other.tsItems[j].timestamp { + newItem := zeroTsItem() + newItem.timestamp = r.tsItems[i].timestamp + newItem.cpuTimeMs = r.tsItems[i].cpuTimeMs + newItem.stmtStats = r.tsItems[i].stmtStats + newTsItems = append(newTsItems, newItem) + i++ + } else { + newItem := zeroTsItem() + newItem.timestamp = other.tsItems[j].timestamp + newItem.cpuTimeMs = other.tsItems[j].cpuTimeMs + newItem.stmtStats = other.tsItems[j].stmtStats + newTsItems = append(newTsItems, newItem) + j++ + } + } + if i < len(r.tsItems) { + newTsItems = append(newTsItems, r.tsItems[i:]...) + } + if j < len(other.tsItems) { + newTsItems = append(newTsItems, other.tsItems[j:]...) + } + r.tsItems = newTsItems + r.totalCPUTimeMs += other.totalCPUTimeMs + r.rebuildTsIndex() +} + +// rebuildTsIndex rebuilds the entire tsIndex based on tsItems. +func (r *record) rebuildTsIndex() { + if len(r.tsItems) == 0 { + r.tsIndex = map[uint64]int{} + return + } + r.tsIndex = make(map[uint64]int, len(r.tsItems)) + for index, item := range r.tsItems { + r.tsIndex[item.timestamp] = index + } +} + +// toProto converts the record to the corresponding protobuf representation. +func (r *record) toProto() tipb.TopSQLRecord { + return tipb.TopSQLRecord{ + SqlDigest: r.sqlDigest, + PlanDigest: r.planDigest, + Items: r.tsItems.toProto(), + } +} + +var _ sort.Interface = &records{} + +// records is a sortable list of record, sort by record.totalCPUTimeMs (desc). +type records []record + +func (rs records) Len() int { + return len(rs) +} + +func (rs records) Less(i, j int) bool { + // Order by totalCPUTimeMs **DESC**. + return rs[i].totalCPUTimeMs > rs[j].totalCPUTimeMs +} + +func (rs records) Swap(i, j int) { + rs[i], rs[j] = rs[j], rs[i] +} + +// topN returns the largest n records (by record.totalCPUTimeMs), other +// records are returned as evicted. +func (rs records) topN(n int) (top, evicted records) { + if len(rs) <= n { + return rs, nil + } + if err := quickselect.QuickSelect(rs, n); err != nil { + return rs, nil + } + return rs[:n], rs[n:] +} + +// toProto converts the records to the corresponding protobuf representation. +func (rs records) toProto() []tipb.TopSQLRecord { + pb := make([]tipb.TopSQLRecord, 0, len(rs)) + for _, r := range rs { + pb = append(pb, r.toProto()) + } + return pb +} + +// collecting includes the collection of data being collected by the reporter. +type collecting struct { + records map[string]*record // sqlPlanDigest => record + evicted map[uint64]map[string]struct{} // { sqlPlanDigest } + keyBuf *bytes.Buffer +} + +func newCollecting() *collecting { + return &collecting{ + records: map[string]*record{}, + evicted: map[uint64]map[string]struct{}{}, + keyBuf: bytes.NewBuffer(make([]byte, 0, 64)), + } +} + +// getOrCreateRecord gets the record corresponding to sqlDigest + planDigest, if it +// does not exist, it will be created. +func (c *collecting) getOrCreateRecord(sqlDigest, planDigest []byte) *record { + key := encodeKey(c.keyBuf, sqlDigest, planDigest) + r, ok := c.records[key] + if !ok { + r = newRecord(sqlDigest, planDigest) + c.records[key] = r + } + return r +} + +// markAsEvicted marks sqlDigest + planDigest under a certain timestamp as "evicted". +// Later, we can determine whether a certain sqlDigest + planDigest within a certain +// timestamp has been evicted. +func (c *collecting) markAsEvicted(timestamp uint64, sqlDigest, planDigest []byte) { + if _, ok := c.evicted[timestamp]; !ok { + c.evicted[timestamp] = map[string]struct{}{} + } + c.evicted[timestamp][encodeKey(c.keyBuf, sqlDigest, planDigest)] = struct{}{} +} + +// hasEvicted determines whether a certain sqlDigest + planDigest has been evicted +// in a certain timestamp. +func (c *collecting) hasEvicted(timestamp uint64, sqlDigest, planDigest []byte) bool { + if digestSet, ok := c.evicted[timestamp]; ok { + if _, ok := digestSet[encodeKey(c.keyBuf, sqlDigest, planDigest)]; ok { + return true + } + } + return false +} + +// appendOthersCPUTime appends totalCPUTimeMs to a special record named "others". +func (c *collecting) appendOthersCPUTime(timestamp uint64, totalCPUTimeMs uint32) { + if totalCPUTimeMs == 0 { + return + } + others, ok := c.records[keyOthers] + if !ok { + others = newRecord(nil, nil) + c.records[keyOthers] = others + } + others.appendCPUTime(timestamp, totalCPUTimeMs) +} + +// appendOthersStmtStatsItem appends stmtstats.StatementStatsItem to a special record named "others". +func (c *collecting) appendOthersStmtStatsItem(timestamp uint64, item stmtstats.StatementStatsItem) { + others, ok := c.records[keyOthers] + if !ok { + others = newRecord(nil, nil) + c.records[keyOthers] = others + } + others.appendStmtStatsItem(timestamp, item) +} + +// compactToTopNAndOthers returns the largest N records, other records will be packed and appended to the end. +func (c *collecting) compactToTopNAndOthers(n int) records { + others := c.records[keyOthers] + delete(c.records, keyOthers) + rs := make(records, 0, len(c.records)) + for _, v := range c.records { + rs = append(rs, *v) + } + // Fetch TopN records. + var evicted records + rs, evicted = rs.topN(n) + if others != nil { + // Sort the records by timestamp to fix the affect of time jump backward. + sort.Sort(others) + } else { + others = newRecord(nil, nil) + } + for _, evict := range evicted { + e := evict // Avoid implicit memory aliasing in for loop. + others.merge(&e) + } + if others.totalCPUTimeMs > 0 { + // append others which summarize all evicted item's cpu-time. + rs = append(rs, *others) + } + return rs +} + +// take away all data inside collecting, put them in the returned new collecting. +func (c *collecting) take() *collecting { + r := &collecting{ + records: c.records, + evicted: c.evicted, + keyBuf: bytes.NewBuffer(make([]byte, 0, 64)), + } + c.records = map[string]*record{} + c.evicted = map[uint64]map[string]struct{}{} + return r +} + +// cpuRecords is a sortable list of tracecpu.SQLCPUTimeRecord, sort by CPUTimeMs (desc). +type cpuRecords []tracecpu.SQLCPUTimeRecord + +func (rs cpuRecords) Len() int { + return len(rs) +} + +func (rs cpuRecords) Less(i, j int) bool { + // Order by CPUTimeMs **DESC**. + return rs[i].CPUTimeMs > rs[j].CPUTimeMs +} + +func (rs cpuRecords) Swap(i, j int) { + rs[i], rs[j] = rs[j], rs[i] +} + +// topN returns the largest n cpuRecords (by CPUTimeMs), other cpuRecords are returned as evicted. +func (rs cpuRecords) topN(n int) (top, evicted cpuRecords) { + if len(rs) <= n { + return rs, nil + } + if err := quickselect.QuickSelect(rs, n); err != nil { + return rs, nil + } + return rs[:n], rs[n:] +} + +// sqlMeta is the SQL meta which contains the normalized SQL string and a bool +// field which uses to distinguish internal SQL. +type sqlMeta struct { + normalizedSQL string + isInternal bool +} + +// normalizedSQLMap is a wrapped map used to register normalizedSQL. +type normalizedSQLMap struct { + data atomic.Value // *sync.Map + length atomic2.Int64 +} + +func newNormalizedSQLMap() *normalizedSQLMap { + r := &normalizedSQLMap{} + r.data.Store(&sync.Map{}) + return r +} + +// register saves the relationship between sqlDigest and normalizedSQL. +// If the internal map size exceeds the limit, the relationship will be discarded. +func (m *normalizedSQLMap) register(sqlDigest []byte, normalizedSQL string, isInternal bool) { + if m.length.Load() >= variable.TopSQLVariable.MaxCollect.Load() { + ignoreExceedSQLCounter.Inc() + return + } + data := m.data.Load().(*sync.Map) + _, loaded := data.LoadOrStore(string(sqlDigest), sqlMeta{ + normalizedSQL: normalizedSQL, + isInternal: isInternal, + }) + if !loaded { + m.length.Add(1) + } +} + +// take away all data inside normalizedSQLMap, put them in the returned new normalizedSQLMap. +func (m *normalizedSQLMap) take() *normalizedSQLMap { + data := m.data.Load().(*sync.Map) + length := m.length.Load() + r := &normalizedSQLMap{} + r.data.Store(data) + r.length.Store(length) + m.data.Store(&sync.Map{}) + m.length.Store(0) + return r +} + +// toProto converts the normalizedSQLMap to the corresponding protobuf representation. +func (m *normalizedSQLMap) toProto() []tipb.SQLMeta { + metas := make([]tipb.SQLMeta, 0, m.length.Load()) + m.data.Load().(*sync.Map).Range(func(k, v interface{}) bool { + meta := v.(sqlMeta) + metas = append(metas, tipb.SQLMeta{ + SqlDigest: []byte(k.(string)), + NormalizedSql: meta.normalizedSQL, + IsInternalSql: meta.isInternal, + }) + return true + }) + return metas +} + +// planBinaryDecodeFunc is used to decode the value when converting +// normalizedPlanMap to protobuf representation. +type planBinaryDecodeFunc func(string) (string, error) + +// normalizedSQLMap is a wrapped map used to register normalizedPlan. +type normalizedPlanMap struct { + data atomic.Value // *sync.Map + length atomic2.Int64 +} + +func newNormalizedPlanMap() *normalizedPlanMap { + r := &normalizedPlanMap{} + r.data.Store(&sync.Map{}) + return r +} + +// register saves the relationship between planDigest and normalizedPlan. +// If the internal map size exceeds the limit, the relationship will be discarded. +func (m *normalizedPlanMap) register(planDigest []byte, normalizedPlan string) { + if m.length.Load() >= variable.TopSQLVariable.MaxCollect.Load() { + ignoreExceedPlanCounter.Inc() + return + } + data := m.data.Load().(*sync.Map) + _, loaded := data.LoadOrStore(string(planDigest), normalizedPlan) + if !loaded { + m.length.Add(1) + } +} + +// take away all data inside normalizedPlanMap, put them in the returned new normalizedPlanMap. +func (m *normalizedPlanMap) take() *normalizedPlanMap { + data := m.data.Load().(*sync.Map) + length := m.length.Load() + r := &normalizedPlanMap{} + r.data.Store(data) + r.length.Store(length) + m.data.Store(&sync.Map{}) + m.length.Store(0) + return r +} + +// toProto converts the normalizedPlanMap to the corresponding protobuf representation. +func (m *normalizedPlanMap) toProto(decodePlan planBinaryDecodeFunc) []tipb.PlanMeta { + metas := make([]tipb.PlanMeta, 0, m.length.Load()) + m.data.Load().(*sync.Map).Range(func(k, v interface{}) bool { + planDecoded, errDecode := decodePlan(v.(string)) + if errDecode != nil { + logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode)) + return true + } + metas = append(metas, tipb.PlanMeta{ + PlanDigest: []byte(k.(string)), + NormalizedPlan: planDecoded, + }) + return true + }) + return metas +} + +func encodeKey(buf *bytes.Buffer, sqlDigest, planDigest []byte) string { + buf.Reset() + buf.Write(sqlDigest) + buf.Write(planDigest) + return buf.String() +} diff --git a/util/topsql/reporter/datamodel_test.go b/util/topsql/reporter/datamodel_test.go new file mode 100644 index 0000000000000..d65bf6ddb2a3d --- /dev/null +++ b/util/topsql/reporter/datamodel_test.go @@ -0,0 +1,466 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "bytes" + "sort" + "sync" + "testing" + + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/topsql/stmtstats" + "github.com/pingcap/tipb/go-tipb" + "github.com/stretchr/testify/assert" +) + +func Test_tsItem_toProto(t *testing.T) { + item := &tsItem{ + timestamp: 1, + cpuTimeMs: 2, + stmtStats: stmtstats.StatementStatsItem{ + ExecCount: 3, + KvStatsItem: stmtstats.KvStatementStatsItem{KvExecCount: map[string]uint64{"": 4}}, + }, + } + pb := item.toProto() + assert.Equal(t, uint64(1), pb.TimestampSec) + assert.Equal(t, uint32(2), pb.CpuTimeMs) + assert.Equal(t, uint64(3), pb.StmtExecCount) + assert.Equal(t, uint64(4), pb.StmtKvExecCount[""]) +} + +func Test_tsItems_Sort(t *testing.T) { + items := tsItems{} + assert.True(t, items.sorted()) + items = nil + assert.True(t, items.sorted()) + items = tsItems{ + {timestamp: 2}, + {timestamp: 3}, + {timestamp: 1}, + } + assert.False(t, items.sorted()) + sort.Sort(items) + assert.True(t, items.sorted()) + assert.Equal(t, uint64(1), items[0].timestamp) + assert.Equal(t, uint64(2), items[1].timestamp) + assert.Equal(t, uint64(3), items[2].timestamp) +} + +func Test_tsItems_toProto(t *testing.T) { + items := &tsItems{{}, {}, {}} + pb := items.toProto() + assert.Len(t, pb, 3) +} + +func Test_record_Sort(t *testing.T) { + r := record{ + tsItems: tsItems{ + {timestamp: 2}, + {timestamp: 3}, + {timestamp: 1}, + }, + tsIndex: map[uint64]int{ + 2: 0, + 3: 1, + 1: 2, + }, + } + sort.Sort(&r) + assert.Equal(t, uint64(1), r.tsItems[0].timestamp) + assert.Equal(t, uint64(2), r.tsItems[1].timestamp) + assert.Equal(t, uint64(3), r.tsItems[2].timestamp) + assert.Equal(t, 0, r.tsIndex[1]) + assert.Equal(t, 1, r.tsIndex[2]) + assert.Equal(t, 2, r.tsIndex[3]) +} + +func Test_record_append(t *testing.T) { + r := newRecord(nil, nil) + // TimestampList: [] + // CPUTimeList: [] + // ExecCountList: [] + + r.appendCPUTime(1, 1) + // TimestampList: [1] + // CPUTimeList: [1] + // ExecCountList: [0] + + r.appendStmtStatsItem(1, stmtstats.StatementStatsItem{ExecCount: 1}) + // TimestampList: [1] + // CPUTimeList: [1] + // ExecCountList: [1] + + r.appendCPUTime(2, 1) + // TimestampList: [1, 2] + // CPUTimeList: [1, 1] + // ExecCountList: [1, 0] + + r.appendCPUTime(3, 1) + // TimestampList: [1, 2, 3] + // CPUTimeList: [1, 1, 1] + // ExecCountList: [1, 0, 0] + + r.appendStmtStatsItem(3, stmtstats.StatementStatsItem{ExecCount: 1}) + // TimestampList: [1, 2, 3] + // CPUTimeList: [1, 1, 1] + // ExecCountList: [1, 0, 1] + + r.appendStmtStatsItem(2, stmtstats.StatementStatsItem{ExecCount: 1}) + // TimestampList: [1, 2, 3] + // CPUTimeList: [1, 1, 1] + // ExecCountList: [1, 1, 1] + + assert.Len(t, r.tsItems, 3) + assert.Len(t, r.tsIndex, 3) + assert.Equal(t, uint64(3), r.totalCPUTimeMs) + assert.Equal(t, uint64(1), r.tsItems[0].timestamp) + assert.Equal(t, uint64(2), r.tsItems[1].timestamp) + assert.Equal(t, uint64(3), r.tsItems[2].timestamp) + assert.Equal(t, uint32(1), r.tsItems[0].cpuTimeMs) + assert.Equal(t, uint32(1), r.tsItems[1].cpuTimeMs) + assert.Equal(t, uint32(1), r.tsItems[2].cpuTimeMs) + assert.Equal(t, uint64(1), r.tsItems[0].stmtStats.ExecCount) + assert.Equal(t, uint64(1), r.tsItems[1].stmtStats.ExecCount) + assert.Equal(t, uint64(1), r.tsItems[2].stmtStats.ExecCount) +} + +func Test_record_merge(t *testing.T) { + r1 := record{ + totalCPUTimeMs: 1 + 2 + 3, + tsItems: tsItems{ + {timestamp: 1, cpuTimeMs: 1, stmtStats: *stmtstats.NewStatementStatsItem()}, + {timestamp: 2, cpuTimeMs: 2, stmtStats: *stmtstats.NewStatementStatsItem()}, + {timestamp: 3, cpuTimeMs: 3, stmtStats: *stmtstats.NewStatementStatsItem()}, + }, + } + r1.rebuildTsIndex() + r2 := record{ + totalCPUTimeMs: 6 + 5 + 4, + tsItems: tsItems{ + {timestamp: 6, cpuTimeMs: 6, stmtStats: *stmtstats.NewStatementStatsItem()}, + {timestamp: 5, cpuTimeMs: 5, stmtStats: *stmtstats.NewStatementStatsItem()}, + {timestamp: 4, cpuTimeMs: 4, stmtStats: *stmtstats.NewStatementStatsItem()}, + }, + } + r2.rebuildTsIndex() + r1.merge(&r2) + assert.Equal(t, uint64(4), r2.tsItems[0].timestamp) + assert.Equal(t, uint64(5), r2.tsItems[1].timestamp) + assert.Equal(t, uint64(6), r2.tsItems[2].timestamp) + assert.Len(t, r1.tsItems, 6) + assert.Len(t, r1.tsIndex, 6) + assert.Equal(t, uint64(1), r1.tsItems[0].timestamp) + assert.Equal(t, uint64(2), r1.tsItems[1].timestamp) + assert.Equal(t, uint64(3), r1.tsItems[2].timestamp) + assert.Equal(t, uint64(4), r1.tsItems[3].timestamp) + assert.Equal(t, uint64(5), r1.tsItems[4].timestamp) + assert.Equal(t, uint64(6), r1.tsItems[5].timestamp) + assert.Equal(t, uint64(1+2+3+4+5+6), r1.totalCPUTimeMs) +} + +func Test_record_rebuildTsIndex(t *testing.T) { + r := record{tsIndex: map[uint64]int{1: 1}} + r.rebuildTsIndex() + assert.Empty(t, r.tsIndex) + r.tsItems = tsItems{ + {timestamp: 1, cpuTimeMs: 1}, + {timestamp: 2, cpuTimeMs: 2}, + {timestamp: 3, cpuTimeMs: 3}, + } + r.rebuildTsIndex() + assert.Len(t, r.tsIndex, 3) + assert.Equal(t, 0, r.tsIndex[1]) + assert.Equal(t, 1, r.tsIndex[2]) + assert.Equal(t, 2, r.tsIndex[3]) +} + +func Test_record_toProto(t *testing.T) { + r := record{ + sqlDigest: []byte("SQL-1"), + planDigest: []byte("PLAN-1"), + totalCPUTimeMs: 123, + tsItems: tsItems{{}, {}, {}}, + } + pb := r.toProto() + assert.Equal(t, []byte("SQL-1"), pb.SqlDigest) + assert.Equal(t, []byte("PLAN-1"), pb.PlanDigest) + assert.Len(t, pb.Items, 3) +} + +func Test_records_Sort(t *testing.T) { + rs := records{ + {totalCPUTimeMs: 1}, + {totalCPUTimeMs: 3}, + {totalCPUTimeMs: 2}, + } + sort.Sort(rs) + assert.Equal(t, uint64(3), rs[0].totalCPUTimeMs) + assert.Equal(t, uint64(2), rs[1].totalCPUTimeMs) + assert.Equal(t, uint64(1), rs[2].totalCPUTimeMs) +} + +func Test_records_topN(t *testing.T) { + rs := records{ + {totalCPUTimeMs: 1}, + {totalCPUTimeMs: 3}, + {totalCPUTimeMs: 2}, + } + top, evicted := rs.topN(2) + assert.Len(t, top, 2) + assert.Len(t, evicted, 1) + assert.Equal(t, uint64(3), top[0].totalCPUTimeMs) + assert.Equal(t, uint64(2), top[1].totalCPUTimeMs) + assert.Equal(t, uint64(1), evicted[0].totalCPUTimeMs) +} + +func Test_records_toProto(t *testing.T) { + rs := records{{}, {}} + pb := rs.toProto() + assert.Len(t, pb, 2) +} + +func Test_collecting_getOrCreateRecord(t *testing.T) { + c := newCollecting() + r1 := c.getOrCreateRecord([]byte("SQL-1"), []byte("PLAN-1")) + assert.NotNil(t, r1) + r2 := c.getOrCreateRecord([]byte("SQL-1"), []byte("PLAN-1")) + assert.Equal(t, r1, r2) +} + +func Test_collecting_markAsEvicted_hasEvicted(t *testing.T) { + c := newCollecting() + c.markAsEvicted(1, []byte("SQL-1"), []byte("PLAN-1")) + assert.True(t, c.hasEvicted(1, []byte("SQL-1"), []byte("PLAN-1"))) + assert.False(t, c.hasEvicted(1, []byte("SQL-2"), []byte("PLAN-2"))) + assert.False(t, c.hasEvicted(2, []byte("SQL-1"), []byte("PLAN-1"))) +} + +func Test_collecting_appendOthers(t *testing.T) { + c := newCollecting() + c.appendOthersCPUTime(1, 1) + c.appendOthersCPUTime(2, 2) + c.appendOthersStmtStatsItem(1, stmtstats.StatementStatsItem{ExecCount: 1}) + c.appendOthersStmtStatsItem(2, stmtstats.StatementStatsItem{ExecCount: 2}) + r := c.records[keyOthers] + assert.Len(t, r.tsItems, 2) + assert.Len(t, r.tsIndex, 2) + assert.Equal(t, uint64(1), r.tsItems[0].timestamp) + assert.Equal(t, uint64(2), r.tsItems[1].timestamp) + assert.Equal(t, uint32(1), r.tsItems[0].cpuTimeMs) + assert.Equal(t, uint32(2), r.tsItems[1].cpuTimeMs) + assert.Equal(t, uint64(1), r.tsItems[0].stmtStats.ExecCount) + assert.Equal(t, uint64(2), r.tsItems[1].stmtStats.ExecCount) +} + +func Test_collecting_compactToTopNAndOthers(t *testing.T) { + c := newCollecting() + c.getOrCreateRecord([]byte("SQL-1"), []byte("PLAN-1")).appendCPUTime(1, 1) + c.getOrCreateRecord([]byte("SQL-2"), []byte("PLAN-2")).appendCPUTime(1, 2) + c.getOrCreateRecord([]byte("SQL-3"), []byte("PLAN-3")).appendCPUTime(1, 3) + rs := c.compactToTopNAndOthers(1) + assert.Len(t, rs, 2) + assert.Equal(t, []byte("SQL-3"), rs[0].sqlDigest) + assert.Equal(t, []byte("PLAN-3"), rs[0].planDigest) + assert.Equal(t, uint64(3), rs[0].totalCPUTimeMs) + assert.Len(t, rs[0].tsItems, 1) + assert.Equal(t, uint32(3), rs[0].tsItems[0].cpuTimeMs) + assert.Equal(t, uint64(3), rs[1].totalCPUTimeMs) // 1 + 2 = 3 +} + +func Test_collecting_take(t *testing.T) { + c1 := newCollecting() + c1.getOrCreateRecord([]byte("SQL-1"), []byte("PLAN-1")).appendCPUTime(1, 1) + c2 := c1.take() + assert.Empty(t, c1.records) + assert.Len(t, c2.records, 1) + assert.NotEqual(t, c1.keyBuf, c2.keyBuf) +} + +func Test_cpuRecords_Sort(t *testing.T) { + rs := cpuRecords{ + {CPUTimeMs: 1}, + {CPUTimeMs: 3}, + {CPUTimeMs: 2}, + } + sort.Sort(rs) + assert.Equal(t, uint32(3), rs[0].CPUTimeMs) + assert.Equal(t, uint32(2), rs[1].CPUTimeMs) + assert.Equal(t, uint32(1), rs[2].CPUTimeMs) +} + +func Test_cpuRecords_topN(t *testing.T) { + rs := cpuRecords{ + {CPUTimeMs: 1}, + {CPUTimeMs: 3}, + {CPUTimeMs: 2}, + } + top, evicted := rs.topN(2) + assert.Len(t, top, 2) + assert.Len(t, evicted, 1) + assert.Equal(t, uint32(3), top[0].CPUTimeMs) + assert.Equal(t, uint32(2), top[1].CPUTimeMs) + assert.Equal(t, uint32(1), evicted[0].CPUTimeMs) +} + +func Test_normalizedSQLMap_register(t *testing.T) { + variable.TopSQLVariable.MaxCollect.Store(2) + m := newNormalizedSQLMap() + m.register([]byte("SQL-1"), "SQL-1", true) + m.register([]byte("SQL-2"), "SQL-2", false) + m.register([]byte("SQL-3"), "SQL-3", true) + assert.Equal(t, int64(2), m.length.Load()) + v, ok := m.data.Load().(*sync.Map).Load("SQL-1") + meta := v.(sqlMeta) + assert.True(t, ok) + assert.Equal(t, "SQL-1", meta.normalizedSQL) + assert.True(t, meta.isInternal) + v, ok = m.data.Load().(*sync.Map).Load("SQL-2") + meta = v.(sqlMeta) + assert.True(t, ok) + assert.Equal(t, "SQL-2", meta.normalizedSQL) + assert.False(t, meta.isInternal) + _, ok = m.data.Load().(*sync.Map).Load("SQL-3") + assert.False(t, ok) +} + +func Test_normalizedSQLMap_take(t *testing.T) { + variable.TopSQLVariable.MaxCollect.Store(999) + m1 := newNormalizedSQLMap() + m1.register([]byte("SQL-1"), "SQL-1", true) + m1.register([]byte("SQL-2"), "SQL-2", false) + m1.register([]byte("SQL-3"), "SQL-3", true) + m2 := m1.take() + assert.Equal(t, int64(0), m1.length.Load()) + assert.Equal(t, int64(3), m2.length.Load()) + data1 := m1.data.Load().(*sync.Map) + _, ok := data1.Load("SQL-1") + assert.False(t, ok) + _, ok = data1.Load("SQL-2") + assert.False(t, ok) + _, ok = data1.Load("SQL-3") + assert.False(t, ok) + data2 := m2.data.Load().(*sync.Map) + _, ok = data2.Load("SQL-1") + assert.True(t, ok) + _, ok = data2.Load("SQL-2") + assert.True(t, ok) + _, ok = data2.Load("SQL-3") + assert.True(t, ok) +} + +func Test_normalizedSQLMap_toProto(t *testing.T) { + variable.TopSQLVariable.MaxCollect.Store(999) + m := newNormalizedSQLMap() + m.register([]byte("SQL-1"), "SQL-1", true) + m.register([]byte("SQL-2"), "SQL-2", false) + m.register([]byte("SQL-3"), "SQL-3", true) + pb := m.toProto() + assert.Len(t, pb, 3) + hash := map[string]tipb.SQLMeta{} + for _, meta := range pb { + hash[meta.NormalizedSql] = meta + } + assert.Equal(t, tipb.SQLMeta{ + SqlDigest: []byte("SQL-1"), + NormalizedSql: "SQL-1", + IsInternalSql: true, + }, hash["SQL-1"]) + assert.Equal(t, tipb.SQLMeta{ + SqlDigest: []byte("SQL-2"), + NormalizedSql: "SQL-2", + IsInternalSql: false, + }, hash["SQL-2"]) + assert.Equal(t, tipb.SQLMeta{ + SqlDigest: []byte("SQL-3"), + NormalizedSql: "SQL-3", + IsInternalSql: true, + }, hash["SQL-3"]) +} + +func Test_normalizedPlanMap_register(t *testing.T) { + variable.TopSQLVariable.MaxCollect.Store(2) + m := newNormalizedPlanMap() + m.register([]byte("PLAN-1"), "PLAN-1") + m.register([]byte("PLAN-2"), "PLAN-2") + m.register([]byte("PLAN-3"), "PLAN-3") + assert.Equal(t, int64(2), m.length.Load()) + v, ok := m.data.Load().(*sync.Map).Load("PLAN-1") + assert.True(t, ok) + assert.Equal(t, "PLAN-1", v.(string)) + v, ok = m.data.Load().(*sync.Map).Load("PLAN-2") + assert.True(t, ok) + assert.Equal(t, "PLAN-2", v.(string)) + _, ok = m.data.Load().(*sync.Map).Load("PLAN-3") + assert.False(t, ok) +} + +func Test_normalizedPlanMap_take(t *testing.T) { + variable.TopSQLVariable.MaxCollect.Store(999) + m1 := newNormalizedPlanMap() + m1.register([]byte("PLAN-1"), "PLAN-1") + m1.register([]byte("PLAN-2"), "PLAN-2") + m1.register([]byte("PLAN-3"), "PLAN-3") + m2 := m1.take() + assert.Equal(t, int64(0), m1.length.Load()) + assert.Equal(t, int64(3), m2.length.Load()) + data1 := m1.data.Load().(*sync.Map) + _, ok := data1.Load("PLAN-1") + assert.False(t, ok) + _, ok = data1.Load("PLAN-2") + assert.False(t, ok) + _, ok = data1.Load("PLAN-3") + assert.False(t, ok) + data2 := m2.data.Load().(*sync.Map) + _, ok = data2.Load("PLAN-1") + assert.True(t, ok) + _, ok = data2.Load("PLAN-2") + assert.True(t, ok) + _, ok = data2.Load("PLAN-3") + assert.True(t, ok) +} + +func Test_normalizedPlanMap_toProto(t *testing.T) { + variable.TopSQLVariable.MaxCollect.Store(999) + m := newNormalizedPlanMap() + m.register([]byte("PLAN-1"), "PLAN-1") + m.register([]byte("PLAN-2"), "PLAN-2") + m.register([]byte("PLAN-3"), "PLAN-3") + pb := m.toProto(func(s string) (string, error) { return s, nil }) + assert.Len(t, pb, 3) + hash := map[string]tipb.PlanMeta{} + for _, meta := range pb { + hash[meta.NormalizedPlan] = meta + } + assert.Equal(t, tipb.PlanMeta{ + PlanDigest: []byte("PLAN-1"), + NormalizedPlan: "PLAN-1", + }, hash["PLAN-1"]) + assert.Equal(t, tipb.PlanMeta{ + PlanDigest: []byte("PLAN-2"), + NormalizedPlan: "PLAN-2", + }, hash["PLAN-2"]) + assert.Equal(t, tipb.PlanMeta{ + PlanDigest: []byte("PLAN-3"), + NormalizedPlan: "PLAN-3", + }, hash["PLAN-3"]) +} + +func Test_encodeKey(t *testing.T) { + buf := bytes.NewBuffer(make([]byte, 0, 64)) + key := encodeKey(buf, []byte("S"), []byte("P")) + assert.Equal(t, "SP", key) +} diff --git a/util/topsql/reporter/datasink.go b/util/topsql/reporter/datasink.go index 2196be54a0f93..abc24a7a02f7c 100644 --- a/util/topsql/reporter/datasink.go +++ b/util/topsql/reporter/datasink.go @@ -14,7 +14,15 @@ package reporter -import "time" +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tipb/go-tipb" +) // DataSink collects and sends data to a target. type DataSink interface { @@ -26,3 +34,74 @@ type DataSink interface { // OnReporterClosing notifies DataSink that the reporter is closing. OnReporterClosing() } + +// DataSinkRegisterer is for registering DataSink +type DataSinkRegisterer interface { + Register(dataSink DataSink) error + Deregister(dataSink DataSink) +} + +// ReportData contains data that reporter sends to the agent. +type ReportData struct { + // DataRecords contains the compactToTopNAndOthers records []tipb.TopSQLRecord and the `others` + // record which aggregation all []tipb.TopSQLRecord that is out of Top N. + DataRecords []tipb.TopSQLRecord + SQLMetas []tipb.SQLMeta + PlanMetas []tipb.PlanMeta +} + +func (d *ReportData) hasData() bool { + return len(d.DataRecords) != 0 || len(d.SQLMetas) != 0 || len(d.PlanMetas) != 0 +} + +var _ DataSinkRegisterer = &DefaultDataSinkRegisterer{} + +// DefaultDataSinkRegisterer implements DataSinkRegisterer. +type DefaultDataSinkRegisterer struct { + sync.Mutex + ctx context.Context + dataSinks map[DataSink]struct{} +} + +// NewDefaultDataSinkRegisterer creates a new DefaultDataSinkRegisterer which implements DataSinkRegisterer. +func NewDefaultDataSinkRegisterer(ctx context.Context) DefaultDataSinkRegisterer { + return DefaultDataSinkRegisterer{ + ctx: ctx, + dataSinks: make(map[DataSink]struct{}, 10), + } +} + +// Register implements DataSinkRegisterer. +func (r *DefaultDataSinkRegisterer) Register(dataSink DataSink) error { + r.Lock() + defer r.Unlock() + + select { + case <-r.ctx.Done(): + return errors.New("DefaultDataSinkRegisterer closed") + default: + if len(r.dataSinks) >= 10 { + return errors.New("too many datasinks") + } + r.dataSinks[dataSink] = struct{}{} + if len(r.dataSinks) > 0 { + variable.TopSQLVariable.Enable.Store(true) + } + return nil + } +} + +// Deregister implements DataSinkRegisterer. +func (r *DefaultDataSinkRegisterer) Deregister(dataSink DataSink) { + r.Lock() + defer r.Unlock() + + select { + case <-r.ctx.Done(): + default: + delete(r.dataSinks, dataSink) + if len(r.dataSinks) == 0 { + variable.TopSQLVariable.Enable.Store(false) + } + } +} diff --git a/util/topsql/reporter/datasink_test.go b/util/topsql/reporter/datasink_test.go new file mode 100644 index 0000000000000..d6a781e0148bb --- /dev/null +++ b/util/topsql/reporter/datasink_test.go @@ -0,0 +1,58 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestDefaultDataSinkRegisterer(t *testing.T) { + var err error + r := NewDefaultDataSinkRegisterer(context.Background()) + m1 := newMockDataSink2() + m2 := newMockDataSink2() + err = r.Register(m1) + assert.NoError(t, err) + err = r.Register(m2) + assert.NoError(t, err) + assert.Len(t, r.dataSinks, 2) + r.Deregister(m1) + r.Deregister(m2) + assert.Empty(t, r.dataSinks) +} + +type mockDataSink2 struct { + data []*ReportData + closed bool +} + +func newMockDataSink2() *mockDataSink2 { + return &mockDataSink2{ + data: []*ReportData{}, + } +} + +func (m *mockDataSink2) TrySend(data *ReportData, deadline time.Time) error { + m.data = append(m.data, data) + return nil +} + +func (m *mockDataSink2) OnReporterClosing() { + m.closed = true +} diff --git a/util/topsql/reporter/metrics.go b/util/topsql/reporter/metrics.go new file mode 100644 index 0000000000000..0314956a6c795 --- /dev/null +++ b/util/topsql/reporter/metrics.go @@ -0,0 +1,36 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import "github.com/pingcap/tidb/metrics" + +var ( + ignoreExceedSQLCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_exceed_sql") + ignoreExceedPlanCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_exceed_plan") + ignoreCollectChannelFullCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_collect_channel_full") + ignoreCollectStmtChannelFullCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_collect_stmt_channel_full") + ignoreReportChannelFullCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_report_channel_full") + reportAllDurationSuccHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("all", metrics.LblOK) + reportAllDurationFailedHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("all", metrics.LblError) + reportRecordDurationSuccHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("record", metrics.LblOK) + reportRecordDurationFailedHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("record", metrics.LblError) + reportSQLDurationSuccHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("sql", metrics.LblOK) + reportSQLDurationFailedHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("sql", metrics.LblError) + reportPlanDurationSuccHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("plan", metrics.LblOK) + reportPlanDurationFailedHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("plan", metrics.LblError) + topSQLReportRecordCounterHistogram = metrics.TopSQLReportDataHistogram.WithLabelValues("record") + topSQLReportSQLCountHistogram = metrics.TopSQLReportDataHistogram.WithLabelValues("sql") + topSQLReportPlanCountHistogram = metrics.TopSQLReportDataHistogram.WithLabelValues("plan") +) diff --git a/util/topsql/reporter/mock/server.go b/util/topsql/reporter/mock/server.go index 21737e1e6ca9b..59de46443c6e4 100644 --- a/util/topsql/reporter/mock/server.go +++ b/util/topsql/reporter/mock/server.go @@ -34,7 +34,7 @@ type mockAgentServer struct { grpcServer *grpc.Server sqlMetas map[string]tipb.SQLMeta planMetas map[string]string - records [][]*tipb.CPUTimeRecord + records [][]*tipb.TopSQLRecord hang struct { beginTime atomic.Value // time.Time endTime atomic.Value // time.Time @@ -85,8 +85,8 @@ func (svr *mockAgentServer) mayHang() { } } -func (svr *mockAgentServer) ReportCPUTimeRecords(stream tipb.TopSQLAgent_ReportCPUTimeRecordsServer) error { - records := make([]*tipb.CPUTimeRecord, 0, 10) +func (svr *mockAgentServer) ReportTopSQLRecords(stream tipb.TopSQLAgent_ReportTopSQLRecordsServer) error { + records := make([]*tipb.TopSQLRecord, 0, 10) for { svr.mayHang() req, err := stream.Recv() @@ -180,10 +180,10 @@ func (svr *mockAgentServer) GetPlanMetaByDigestBlocking(digest []byte, timeout t } } -func (svr *mockAgentServer) GetLatestRecords() []*tipb.CPUTimeRecord { +func (svr *mockAgentServer) GetLatestRecords() []*tipb.TopSQLRecord { svr.Lock() records := svr.records - svr.records = [][]*tipb.CPUTimeRecord{} + svr.records = [][]*tipb.TopSQLRecord{} svr.Unlock() if len(records) == 0 { diff --git a/util/topsql/reporter/pubsub.go b/util/topsql/reporter/pubsub.go index 7d01c077e058f..62e6d2af8c412 100644 --- a/util/topsql/reporter/pubsub.go +++ b/util/topsql/reporter/pubsub.go @@ -146,7 +146,7 @@ func (ds *pubSubDataSink) run() error { } func (ds *pubSubDataSink) doSend(ctx context.Context, data *ReportData) error { - if err := ds.sendCPUTime(ctx, data.CPUTimeRecords); err != nil { + if err := ds.sendTopSQLRecords(ctx, data.DataRecords); err != nil { return err } if err := ds.sendSQLMeta(ctx, data.SQLMetas); err != nil { @@ -155,7 +155,7 @@ func (ds *pubSubDataSink) doSend(ctx context.Context, data *ReportData) error { return ds.sendPlanMeta(ctx, data.PlanMetas) } -func (ds *pubSubDataSink) sendCPUTime(ctx context.Context, records []tipb.CPUTimeRecord) (err error) { +func (ds *pubSubDataSink) sendTopSQLRecords(ctx context.Context, records []tipb.TopSQLRecord) (err error) { if len(records) == 0 { return } @@ -171,11 +171,11 @@ func (ds *pubSubDataSink) sendCPUTime(ctx context.Context, records []tipb.CPUTim } }() - cpuRecord := &tipb.TopSQLSubResponse_Record{} - r := &tipb.TopSQLSubResponse{RespOneof: cpuRecord} + topSQLRecord := &tipb.TopSQLSubResponse_Record{} + r := &tipb.TopSQLSubResponse{RespOneof: topSQLRecord} for i := range records { - cpuRecord.Record = &records[i] + topSQLRecord.Record = &records[i] if err = ds.stream.Send(r); err != nil { return } diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index 879ac0d61e438..2e6c442815473 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -15,517 +15,245 @@ package reporter import ( - "bytes" "context" - "errors" - "sort" - "sync" - "sync/atomic" "time" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/pingcap/tidb/util/topsql/tracecpu" - "github.com/pingcap/tipb/go-tipb" - "github.com/wangjohn/quickselect" - atomic2 "go.uber.org/atomic" "go.uber.org/zap" ) const ( - dialTimeout = 5 * time.Second - reportTimeout = 40 * time.Second - grpcInitialWindowSize = 1 << 30 - grpcInitialConnWindowSize = 1 << 30 - // keyOthers is the key to store the aggregation of all records that is out of Top N. - keyOthers = "" + reportTimeout = 40 * time.Second + collectChanBufferSize = 2 ) -var _ TopSQLReporter = &RemoteTopSQLReporter{} +var nowFunc = time.Now // TopSQLReporter collects Top SQL metrics. type TopSQLReporter interface { tracecpu.Collector + stmtstats.Collector + + // RegisterSQL registers a normalizedSQL with SQLDigest. + // + // Note that the normalized SQL string can be of >1M long. + // This function should be thread-safe, which means concurrently calling it + // in several goroutines should be fine. It should also return immediately, + // and do any CPU-intensive job asynchronously. RegisterSQL(sqlDigest []byte, normalizedSQL string, isInternal bool) - RegisterPlan(planDigest []byte, normalizedPlan string) - Close() -} - -// DataSinkRegisterer is for registering DataSink -type DataSinkRegisterer interface { - Register(dataSink DataSink) error - Deregister(dataSink DataSink) -} - -type cpuData struct { - timestamp uint64 - records []tracecpu.SQLCPUTimeRecord -} - -// dataPoints represents the cumulative SQL plan CPU time in current minute window -// dataPoints do not guarantee the TimestampList is sorted by timestamp when there is a time jump backward. -type dataPoints struct { - SQLDigest []byte - PlanDigest []byte - TimestampList []uint64 - CPUTimeMsList []uint32 - CPUTimeMsTotal uint64 -} - -func (d *dataPoints) isInvalid() bool { - return len(d.TimestampList) != len(d.CPUTimeMsList) -} - -func (d *dataPoints) Len() int { - return len(d.TimestampList) -} - -func (d *dataPoints) Less(i, j int) bool { - // sort by timestamp - return d.TimestampList[i] < d.TimestampList[j] -} -func (d *dataPoints) Swap(i, j int) { - d.TimestampList[i], d.TimestampList[j] = d.TimestampList[j], d.TimestampList[i] - d.CPUTimeMsList[i], d.CPUTimeMsList[j] = d.CPUTimeMsList[j], d.CPUTimeMsList[i] -} - -type dataPointsOrderByCPUTime []*dataPoints - -func (t dataPointsOrderByCPUTime) Len() int { - return len(t) -} - -func (t dataPointsOrderByCPUTime) Less(i, j int) bool { - // We need find the kth largest value, so here should use > - return t[i].CPUTimeMsTotal > t[j].CPUTimeMsTotal -} -func (t dataPointsOrderByCPUTime) Swap(i, j int) { - t[i], t[j] = t[j], t[i] -} - -type sqlCPUTimeRecordSlice []tracecpu.SQLCPUTimeRecord -func (t sqlCPUTimeRecordSlice) Len() int { - return len(t) -} + // RegisterPlan like RegisterSQL, but for normalized plan strings. + RegisterPlan(planDigest []byte, normalizedPlan string) -func (t sqlCPUTimeRecordSlice) Less(i, j int) bool { - // We need find the kth largest value, so here should use > - return t[i].CPUTimeMs > t[j].CPUTimeMs -} -func (t sqlCPUTimeRecordSlice) Swap(i, j int) { - t[i], t[j] = t[j], t[i] + // Close uses to close and release the reporter resource. + Close() } -type planBinaryDecodeFunc func(string) (string, error) +var _ TopSQLReporter = &RemoteTopSQLReporter{} +var _ DataSinkRegisterer = &RemoteTopSQLReporter{} -// RemoteTopSQLReporter implements a TopSQL reporter that sends data to a remote agent -// This should be called periodically to collect TopSQL resource usage metrics +// RemoteTopSQLReporter implements TopSQLReporter that sends data to a remote agent. +// This should be called periodically to collect TopSQL resource usage metrics. type RemoteTopSQLReporter struct { + DefaultDataSinkRegisterer + ctx context.Context cancel context.CancelFunc - dataSinkMu sync.Mutex - dataSinks map[DataSink]struct{} - - // normalizedSQLMap is an map, whose keys are SQL digest strings and values are SQLMeta. - normalizedSQLMap atomic.Value // sync.Map - sqlMapLength atomic2.Int64 - - // normalizedPlanMap is an map, whose keys are plan digest strings and values are normalized plans **in binary**. - // The normalized plans in binary can be decoded to string using the `planBinaryDecoder`. - normalizedPlanMap atomic.Value // sync.Map - planMapLength atomic2.Int64 - - collectCPUDataChan chan cpuData + collectCPUTimeChan chan []tracecpu.SQLCPUTimeRecord + collectStmtStatsChan chan stmtstats.StatementStatsMap reportCollectedDataChan chan collectedData - // calling decodePlan this can take a while, so should not block critical paths - decodePlan planBinaryDecodeFunc -} + collecting *collecting + normalizedSQLMap *normalizedSQLMap + normalizedPlanMap *normalizedPlanMap + stmtStatsBuffer map[uint64]stmtstats.StatementStatsMap // timestamp => stmtstats.StatementStatsMap -// SQLMeta is the SQL meta which contains the normalized SQL string and a bool field which uses to distinguish internal SQL. -type SQLMeta struct { - normalizedSQL string - isInternal bool + // calling decodePlan this can take a while, so should not block critical paths. + decodePlan planBinaryDecodeFunc } -// NewRemoteTopSQLReporter creates a new TopSQL reporter +// NewRemoteTopSQLReporter creates a new RemoteTopSQLReporter. // -// planBinaryDecoder is a decoding function which will be called asynchronously to decode the plan binary to string -// MaxStatementsNum is the maximum SQL and plan number, which will restrict the memory usage of the internal LFU cache +// decodePlan is a decoding function which will be called asynchronously to decode the plan binary to string. func NewRemoteTopSQLReporter(decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter { ctx, cancel := context.WithCancel(context.Background()) tsr := &RemoteTopSQLReporter{ - ctx: ctx, - cancel: cancel, - - dataSinks: make(map[DataSink]struct{}, 10), - - collectCPUDataChan: make(chan cpuData, 1), - reportCollectedDataChan: make(chan collectedData, 1), - decodePlan: decodePlan, + DefaultDataSinkRegisterer: NewDefaultDataSinkRegisterer(ctx), + ctx: ctx, + cancel: cancel, + collectCPUTimeChan: make(chan []tracecpu.SQLCPUTimeRecord, collectChanBufferSize), + collectStmtStatsChan: make(chan stmtstats.StatementStatsMap, collectChanBufferSize), + reportCollectedDataChan: make(chan collectedData, 1), + collecting: newCollecting(), + normalizedSQLMap: newNormalizedSQLMap(), + normalizedPlanMap: newNormalizedPlanMap(), + stmtStatsBuffer: map[uint64]stmtstats.StatementStatsMap{}, + decodePlan: decodePlan, } - tsr.normalizedSQLMap.Store(&sync.Map{}) - tsr.normalizedPlanMap.Store(&sync.Map{}) - go tsr.collectWorker() go tsr.reportWorker() - return tsr } -var ( - ignoreExceedSQLCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_exceed_sql") - ignoreExceedPlanCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_exceed_plan") - ignoreCollectChannelFullCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_collect_channel_full") - ignoreReportChannelFullCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_report_channel_full") - reportAllDurationSuccHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("all", metrics.LblOK) - reportAllDurationFailedHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("all", metrics.LblError) - reportRecordDurationSuccHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("record", metrics.LblOK) - reportRecordDurationFailedHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("record", metrics.LblError) - reportSQLDurationSuccHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("sql", metrics.LblOK) - reportSQLDurationFailedHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("sql", metrics.LblError) - reportPlanDurationSuccHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("plan", metrics.LblOK) - reportPlanDurationFailedHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("plan", metrics.LblError) - topSQLReportRecordCounterHistogram = metrics.TopSQLReportDataHistogram.WithLabelValues("record") - topSQLReportSQLCountHistogram = metrics.TopSQLReportDataHistogram.WithLabelValues("sql") - topSQLReportPlanCountHistogram = metrics.TopSQLReportDataHistogram.WithLabelValues("plan") -) - -// RegisterSQL registers a normalized SQL string to a SQL digest. -// This function is thread-safe and efficient. +// Collect implements tracecpu.Collector. // -// Note that the normalized SQL string can be of >1M long. -// This function should be thread-safe, which means parallelly calling it in several goroutines should be fine. -// It should also return immediately, and do any CPU-intensive job asynchronously. -func (tsr *RemoteTopSQLReporter) RegisterSQL(sqlDigest []byte, normalizedSQL string, isInternal bool) { - if tsr.sqlMapLength.Load() >= variable.TopSQLVariable.MaxCollect.Load() { - ignoreExceedSQLCounter.Inc() - return - } - m := tsr.normalizedSQLMap.Load().(*sync.Map) - key := string(sqlDigest) - _, loaded := m.LoadOrStore(key, SQLMeta{ - normalizedSQL: normalizedSQL, - isInternal: isInternal, - }) - if !loaded { - tsr.sqlMapLength.Add(1) - } -} - -// RegisterPlan is like RegisterSQL, but for normalized plan strings. +// WARN: It will drop the DataRecords if the processing is not in time. // This function is thread-safe and efficient. -func (tsr *RemoteTopSQLReporter) RegisterPlan(planDigest []byte, normalizedBinaryPlan string) { - if tsr.planMapLength.Load() >= variable.TopSQLVariable.MaxCollect.Load() { - ignoreExceedPlanCounter.Inc() +func (tsr *RemoteTopSQLReporter) Collect(data []tracecpu.SQLCPUTimeRecord) { + if len(data) == 0 { return } - m := tsr.normalizedPlanMap.Load().(*sync.Map) - key := string(planDigest) - _, loaded := m.LoadOrStore(key, normalizedBinaryPlan) - if !loaded { - tsr.planMapLength.Add(1) - } -} - -var _ DataSinkRegisterer = &RemoteTopSQLReporter{} - -// Register implements DataSinkRegisterer interface. -func (tsr *RemoteTopSQLReporter) Register(dataSink DataSink) error { - tsr.dataSinkMu.Lock() - defer tsr.dataSinkMu.Unlock() - - select { - case <-tsr.ctx.Done(): - return errors.New("reporter is closed") - default: - if len(tsr.dataSinks) >= 10 { - return errors.New("too many datasinks") - } - - tsr.dataSinks[dataSink] = struct{}{} - - if len(tsr.dataSinks) > 0 { - variable.TopSQLVariable.Enable.Store(true) - } - - return nil - } -} - -// Deregister implements DataSinkRegisterer interface. -func (tsr *RemoteTopSQLReporter) Deregister(dataSink DataSink) { - tsr.dataSinkMu.Lock() - defer tsr.dataSinkMu.Unlock() - select { - case <-tsr.ctx.Done(): + case tsr.collectCPUTimeChan <- data: default: - delete(tsr.dataSinks, dataSink) - - if len(tsr.dataSinks) == 0 { - variable.TopSQLVariable.Enable.Store(false) - } + // ignore if chan blocked + ignoreCollectChannelFullCounter.Inc() } } -// Collect receives CPU time records for processing. WARN: It will drop the records if the processing is not in time. +// CollectStmtStatsMap implements stmtstats.Collector. +// +// WARN: It will drop the DataRecords if the processing is not in time. // This function is thread-safe and efficient. -func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQLCPUTimeRecord) { - if len(records) == 0 { +func (tsr *RemoteTopSQLReporter) CollectStmtStatsMap(data stmtstats.StatementStatsMap) { + if len(data) == 0 { return } select { - case tsr.collectCPUDataChan <- cpuData{ - timestamp: timestamp, - records: records, - }: + case tsr.collectStmtStatsChan <- data: default: // ignore if chan blocked - ignoreCollectChannelFullCounter.Inc() + ignoreCollectStmtChannelFullCounter.Inc() } } -// Close uses to close and release the reporter resource. -func (tsr *RemoteTopSQLReporter) Close() { - tsr.cancel() - - var m map[DataSink]struct{} - tsr.dataSinkMu.Lock() - m, tsr.dataSinks = tsr.dataSinks, make(map[DataSink]struct{}) - tsr.dataSinkMu.Unlock() - - for d := range m { - d.OnReporterClosing() - } +// RegisterSQL implements TopSQLReporter. +// +// This function is thread-safe and efficient. +func (tsr *RemoteTopSQLReporter) RegisterSQL(sqlDigest []byte, normalizedSQL string, isInternal bool) { + tsr.normalizedSQLMap.register(sqlDigest, normalizedSQL, isInternal) } -func addEvictedCPUTime(collectTarget map[string]*dataPoints, timestamp uint64, totalCPUTimeMs uint32) { - if totalCPUTimeMs == 0 { - return - } - others, ok := collectTarget[keyOthers] - if !ok { - others = &dataPoints{} - collectTarget[keyOthers] = others - } - if len(others.TimestampList) == 0 { - others.TimestampList = []uint64{timestamp} - others.CPUTimeMsList = []uint32{totalCPUTimeMs} - } else { - others.TimestampList = append(others.TimestampList, timestamp) - others.CPUTimeMsList = append(others.CPUTimeMsList, totalCPUTimeMs) - } - others.CPUTimeMsTotal += uint64(totalCPUTimeMs) +// RegisterPlan implements TopSQLReporter. +// +// This function is thread-safe and efficient. +func (tsr *RemoteTopSQLReporter) RegisterPlan(planDigest []byte, normalizedPlan string) { + tsr.normalizedPlanMap.register(planDigest, normalizedPlan) } -// addEvictedIntoSortedDataPoints adds evicted dataPoints into others. -// Attention, this function depend on others dataPoints is sorted, and this function will modify evicted dataPoints -// to make sure it is sorted by timestamp. -func addEvictedIntoSortedDataPoints(others *dataPoints, evict *dataPoints) *dataPoints { - if others == nil { - others = &dataPoints{} - } - if evict == nil || len(evict.TimestampList) == 0 { - return others - } - if evict.isInvalid() { - logutil.BgLogger().Warn("[top-sql] data points is invalid, it should never happen", zap.Any("self", others), zap.Any("evict", evict)) - return others - } - // Sort the dataPoints by timestamp to fix the affect of time jump backward. - sort.Sort(evict) - if len(others.TimestampList) == 0 { - others.TimestampList = evict.TimestampList - others.CPUTimeMsList = evict.CPUTimeMsList - others.CPUTimeMsTotal = evict.CPUTimeMsTotal - return others - } - length := len(others.TimestampList) + len(evict.TimestampList) - timestampList := make([]uint64, 0, length) - cpuTimeMsList := make([]uint32, 0, length) - i := 0 - j := 0 - for i < len(others.TimestampList) && j < len(evict.TimestampList) { - if others.TimestampList[i] == evict.TimestampList[j] { - timestampList = append(timestampList, others.TimestampList[i]) - cpuTimeMsList = append(cpuTimeMsList, others.CPUTimeMsList[i]+evict.CPUTimeMsList[j]) - i++ - j++ - } else if others.TimestampList[i] < evict.TimestampList[j] { - timestampList = append(timestampList, others.TimestampList[i]) - cpuTimeMsList = append(cpuTimeMsList, others.CPUTimeMsList[i]) - i++ - } else { - timestampList = append(timestampList, evict.TimestampList[j]) - cpuTimeMsList = append(cpuTimeMsList, evict.CPUTimeMsList[j]) - j++ - } - } - if i < len(others.TimestampList) { - timestampList = append(timestampList, others.TimestampList[i:]...) - cpuTimeMsList = append(cpuTimeMsList, others.CPUTimeMsList[i:]...) - } - if j < len(evict.TimestampList) { - timestampList = append(timestampList, evict.TimestampList[j:]...) - cpuTimeMsList = append(cpuTimeMsList, evict.CPUTimeMsList[j:]...) - } - others.TimestampList = timestampList - others.CPUTimeMsList = cpuTimeMsList - others.CPUTimeMsTotal += evict.CPUTimeMsTotal - return others +// Close implements TopSQLReporter. +func (tsr *RemoteTopSQLReporter) Close() { + tsr.cancel() + tsr.onReporterClosing() } +// collectWorker consumes and collects data from tracecpu.Collector/stmtstats.Collector. func (tsr *RemoteTopSQLReporter) collectWorker() { defer util.Recover("top-sql", "collectWorker", nil, false) - collectedData := make(map[string]*dataPoints) currentReportInterval := variable.TopSQLVariable.ReportIntervalSeconds.Load() + collectTicker := time.NewTicker(time.Second) + defer collectTicker.Stop() reportTicker := time.NewTicker(time.Second * time.Duration(currentReportInterval)) + defer reportTicker.Stop() for { select { - case data := <-tsr.collectCPUDataChan: - // On receiving data to collect: Write to local data array, and retain records with most CPU time. - tsr.doCollect(collectedData, data.timestamp, data.records) + case <-tsr.ctx.Done(): + return + case <-collectTicker.C: + tsr.takeDataFromCollectChanBuffer() case <-reportTicker.C: - tsr.takeDataAndSendToReportChan(&collectedData) + tsr.processStmtStatsData() + tsr.takeDataAndSendToReportChan() // Update `reportTicker` if report interval changed. if newInterval := variable.TopSQLVariable.ReportIntervalSeconds.Load(); newInterval != currentReportInterval { currentReportInterval = newInterval reportTicker.Reset(time.Second * time.Duration(currentReportInterval)) } - case <-tsr.ctx.Done(): - return } } } -func encodeKey(buf *bytes.Buffer, sqlDigest, planDigest []byte) string { - buf.Reset() - buf.Write(sqlDigest) - buf.Write(planDigest) - return buf.String() -} - -func getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topN, shouldEvict []tracecpu.SQLCPUTimeRecord) { - maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) - if len(records) <= maxStmt { - return records, nil - } - if err := quickselect.QuickSelect(sqlCPUTimeRecordSlice(records), maxStmt); err != nil { - // skip eviction - return records, nil - } - return records[:maxStmt], records[maxStmt:] -} - -func getTopNDataPoints(records []*dataPoints) (topN, shouldEvict []*dataPoints) { - maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) - if len(records) <= maxStmt { - return records, nil - } - if err := quickselect.QuickSelect(dataPointsOrderByCPUTime(records), maxStmt); err != nil { - // skip eviction - return records, nil - } - return records[:maxStmt], records[maxStmt:] -} +// processCPUTimeData collects top N cpuRecords of each round into tsr.collecting, and evict the +// data that is not in top N. All the evicted cpuRecords will be summary into the others. +func (tsr *RemoteTopSQLReporter) processCPUTimeData(timestamp uint64, data cpuRecords) { + defer util.Recover("top-sql", "processCPUTimeData", nil, false) -// doCollect collects top N records of each round into collectTarget, and evict the data that is not in top N. -// All the evicted record will be summary into the collectedData.others. -func (tsr *RemoteTopSQLReporter) doCollect( - collectTarget map[string]*dataPoints, timestamp uint64, records []tracecpu.SQLCPUTimeRecord) { - defer util.Recover("top-sql", "doCollect", nil, false) - - // Get top N records of each round records. - var evicted []tracecpu.SQLCPUTimeRecord - records, evicted = getTopNRecords(records) - - keyBuf := bytes.NewBuffer(make([]byte, 0, 64)) - listCapacity := int(variable.TopSQLVariable.ReportIntervalSeconds.Load()/variable.TopSQLVariable.PrecisionSeconds.Load() + 1) - if listCapacity < 1 { - listCapacity = 1 - } - // Collect the top N records to collectTarget for each round. - for _, record := range records { - key := encodeKey(keyBuf, record.SQLDigest, record.PlanDigest) - entry, exist := collectTarget[key] - if !exist { - entry = &dataPoints{ - SQLDigest: record.SQLDigest, - PlanDigest: record.PlanDigest, - CPUTimeMsList: make([]uint32, 1, listCapacity), - TimestampList: make([]uint64, 1, listCapacity), - } - entry.CPUTimeMsList[0] = record.CPUTimeMs - entry.TimestampList[0] = timestamp - collectTarget[key] = entry - } else { - entry.CPUTimeMsList = append(entry.CPUTimeMsList, record.CPUTimeMs) - entry.TimestampList = append(entry.TimestampList, timestamp) - } - entry.CPUTimeMsTotal += uint64(record.CPUTimeMs) + // Get top N cpuRecords of each round cpuRecords. Collect the top N to tsr.collecting + // for each round. SQL meta will not be evicted, since the evicted SQL can be appeared + // on other components (TiKV) TopN DataRecords. + top, evicted := data.topN(int(variable.TopSQLVariable.MaxStatementCount.Load())) + for _, r := range top { + tsr.collecting.getOrCreateRecord(r.SQLDigest, r.PlanDigest).appendCPUTime(timestamp, r.CPUTimeMs) } - if len(evicted) == 0 { return } - // Merge non Top N data as "others" (keyed by in the `keyOthers`) in `collectTarget`. - // SQL meta will not be evicted, since the evicted SQL can be appear on Other components (TiKV) TopN records. totalEvictedCPUTime := uint32(0) - for _, evict := range evicted { - totalEvictedCPUTime += evict.CPUTimeMs + for _, e := range evicted { + totalEvictedCPUTime += e.CPUTimeMs + // Mark which digests are evicted under each timestamp. + // We will determine whether the corresponding CPUTime has been evicted + // when collecting stmtstats. If so, then we can ignore it directly. + tsr.collecting.markAsEvicted(timestamp, e.SQLDigest, e.PlanDigest) + } + tsr.collecting.appendOthersCPUTime(timestamp, totalEvictedCPUTime) +} + +// processStmtStatsData collects tsr.stmtStatsBuffer into tsr.collecting. +// All the evicted items will be summary into the others. +func (tsr *RemoteTopSQLReporter) processStmtStatsData() { + defer util.Recover("top-sql", "processStmtStatsData", nil, false) + + for timestamp, data := range tsr.stmtStatsBuffer { + for digest, item := range data { + sqlDigest, planDigest := []byte(digest.SQLDigest), []byte(digest.PlanDigest) + if tsr.collecting.hasEvicted(timestamp, sqlDigest, planDigest) { + // This timestamp+sql+plan has been evicted due to low CPUTime. + tsr.collecting.appendOthersStmtStatsItem(timestamp, *item) + continue + } + tsr.collecting.getOrCreateRecord(sqlDigest, planDigest).appendStmtStatsItem(timestamp, *item) + } } - addEvictedCPUTime(collectTarget, timestamp, totalEvictedCPUTime) + tsr.stmtStatsBuffer = map[uint64]stmtstats.StatementStatsMap{} } -// takeDataAndSendToReportChan takes collected data and then send to the report channel for reporting. -func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan(collectedDataPtr *map[string]*dataPoints) { - data := collectedData{ - records: *collectedDataPtr, - normalizedSQLMap: tsr.normalizedSQLMap.Load().(*sync.Map), - normalizedPlanMap: tsr.normalizedPlanMap.Load().(*sync.Map), +func (tsr *RemoteTopSQLReporter) takeDataFromCollectChanBuffer() { + timestamp := uint64(nowFunc().Unix()) + for { + select { + case data := <-tsr.collectCPUTimeChan: + tsr.processCPUTimeData(timestamp, data) + case data := <-tsr.collectStmtStatsChan: + tsr.stmtStatsBuffer[timestamp] = data + default: + return + } } +} - // Reset data for next report. - *collectedDataPtr = make(map[string]*dataPoints) - tsr.normalizedSQLMap.Store(&sync.Map{}) - tsr.normalizedPlanMap.Store(&sync.Map{}) - tsr.sqlMapLength.Store(0) - tsr.planMapLength.Store(0) - +// takeDataAndSendToReportChan takes records data and then send to the report channel for reporting. +func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan() { // Send to report channel. When channel is full, data will be dropped. select { - case tsr.reportCollectedDataChan <- data: + case tsr.reportCollectedDataChan <- collectedData{ + collected: tsr.collecting.take(), + normalizedSQLMap: tsr.normalizedSQLMap.take(), + normalizedPlanMap: tsr.normalizedPlanMap.take(), + }: default: // ignore if chan blocked ignoreReportChannelFullCounter.Inc() } } -type collectedData struct { - records map[string]*dataPoints - normalizedSQLMap *sync.Map - normalizedPlanMap *sync.Map -} - -// ReportData contains data that reporter sends to the agent -type ReportData struct { - // CPUTimeRecords contains the topN collected records and the `others` record which aggregation all records that is out of Top N. - CPUTimeRecords []tipb.CPUTimeRecord - SQLMetas []tipb.SQLMeta - PlanMetas []tipb.PlanMeta -} - -func (d *ReportData) hasData() bool { - return len(d.CPUTimeRecords) != 0 || len(d.SQLMetas) != 0 || len(d.PlanMetas) != 0 -} - // reportWorker sends data to the gRPC endpoint from the `reportCollectedDataChan` one by one. func (tsr *RemoteTopSQLReporter) reportWorker() { defer util.Recover("top-sql", "reportWorker", nil, false) @@ -533,108 +261,32 @@ func (tsr *RemoteTopSQLReporter) reportWorker() { for { select { case data := <-tsr.reportCollectedDataChan: - // When `reportCollectedDataChan` receives something, there could be ongoing `RegisterSQL` and `RegisterPlan` running, - // who writes to the data structure that `data` contains. So we wait for a little while to ensure that - // these writes are finished. + // When `reportCollectedDataChan` receives something, there could be ongoing + // `RegisterSQL` and `RegisterPlan` running, who writes to the data structure + // that `data` contains. So we wait for a little while to ensure that writes + // are finished. time.Sleep(time.Millisecond * 100) - report := tsr.getReportData(data) - tsr.doReport(report) + // Get top N records from records. + rs := data.collected.compactToTopNAndOthers(int(variable.TopSQLVariable.MaxStatementCount.Load())) + // Convert to protobuf data and do report. + tsr.doReport(&ReportData{ + DataRecords: rs.toProto(), + SQLMetas: data.normalizedSQLMap.toProto(), + PlanMetas: data.normalizedPlanMap.toProto(tsr.decodePlan), + }) case <-tsr.ctx.Done(): return } } } -// getReportData gets ReportData from the collectedData. -// This function will calculate the topN collected records and the `others` record which aggregation all records that is out of Top N. -func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) *ReportData { - records := getTopNFromCollected(collected) - return tsr.buildReportData(records, collected.normalizedSQLMap, collected.normalizedPlanMap) -} - -func getTopNFromCollected(collected collectedData) (records []*dataPoints) { - // Fetch TopN dataPoints. - others := collected.records[keyOthers] - delete(collected.records, keyOthers) - - records = make([]*dataPoints, 0, len(collected.records)) - for _, v := range collected.records { - records = append(records, v) - } - - // Evict all records that is out of Top N. - var evicted []*dataPoints - records, evicted = getTopNDataPoints(records) - if others != nil { - // Sort the dataPoints by timestamp to fix the affect of time jump backward. - sort.Sort(others) - } - for _, evict := range evicted { - // SQL meta will not be evicted, since the evicted SQL can be appeared on Other components (TiKV) TopN records. - others = addEvictedIntoSortedDataPoints(others, evict) - } - - // append others which summarize all evicted item's cpu-time. - if others != nil && others.CPUTimeMsTotal > 0 { - records = append(records, others) - } - - return -} - -// buildReportData convert record data in dataPoints slice and meta data in sync.Map to ReportData. -// -// Attention, caller should guarantee no more reader or writer access `sqlMap` and `planMap`, because buildReportData -// will do heavy jobs in sync.Map.Range and it may block other readers and writers. -func (tsr *RemoteTopSQLReporter) buildReportData(records []*dataPoints, sqlMap *sync.Map, planMap *sync.Map) *ReportData { - res := &ReportData{ - CPUTimeRecords: make([]tipb.CPUTimeRecord, 0, len(records)), - SQLMetas: make([]tipb.SQLMeta, 0, len(records)), - PlanMetas: make([]tipb.PlanMeta, 0, len(records)), - } - - for _, record := range records { - res.CPUTimeRecords = append(res.CPUTimeRecords, tipb.CPUTimeRecord{ - RecordListTimestampSec: record.TimestampList, - RecordListCpuTimeMs: record.CPUTimeMsList, - SqlDigest: record.SQLDigest, - PlanDigest: record.PlanDigest, - }) - } - - sqlMap.Range(func(key, value interface{}) bool { - meta := value.(SQLMeta) - res.SQLMetas = append(res.SQLMetas, tipb.SQLMeta{ - SqlDigest: []byte(key.(string)), - NormalizedSql: meta.normalizedSQL, - IsInternalSql: meta.isInternal, - }) - return true - }) - - planMap.Range(func(key, value interface{}) bool { - planDecoded, errDecode := tsr.decodePlan(value.(string)) - if errDecode != nil { - logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode)) - return true - } - res.PlanMetas = append(res.PlanMetas, tipb.PlanMeta{ - PlanDigest: []byte(key.(string)), - NormalizedPlan: planDecoded, - }) - return true - }) - - return res -} - +// doReport sends ReportData to DataSinks. func (tsr *RemoteTopSQLReporter) doReport(data *ReportData) { defer util.Recover("top-sql", "doReport", nil, false) if !data.hasData() { return } - timeout := reportTimeout failpoint.Inject("resetTimeoutForTest", func(val failpoint.Value) { if val.(bool) { @@ -644,18 +296,39 @@ func (tsr *RemoteTopSQLReporter) doReport(data *ReportData) { } } }) - deadline := time.Now().Add(timeout) + _ = tsr.trySend(data, time.Now().Add(timeout)) +} - tsr.dataSinkMu.Lock() +// trySend sends ReportData to all internal registered DataSinks. +func (tsr *RemoteTopSQLReporter) trySend(data *ReportData, deadline time.Time) error { + tsr.DefaultDataSinkRegisterer.Lock() dataSinks := make([]DataSink, 0, len(tsr.dataSinks)) for ds := range tsr.dataSinks { dataSinks = append(dataSinks, ds) } - tsr.dataSinkMu.Unlock() - + tsr.DefaultDataSinkRegisterer.Unlock() for _, ds := range dataSinks { if err := ds.TrySend(data, deadline); err != nil { logutil.BgLogger().Warn("[top-sql] failed to send data to datasink", zap.Error(err)) } } + return nil +} + +// onReporterClosing calls the OnReporterClosing method of all internally registered DataSinks. +func (tsr *RemoteTopSQLReporter) onReporterClosing() { + var m map[DataSink]struct{} + tsr.DefaultDataSinkRegisterer.Lock() + m, tsr.dataSinks = tsr.dataSinks, make(map[DataSink]struct{}) + tsr.DefaultDataSinkRegisterer.Unlock() + for d := range m { + d.OnReporterClosing() + } +} + +// collectedData is used for transmission in the channel. +type collectedData struct { + collected *collecting + normalizedSQLMap *normalizedSQLMap + normalizedPlanMap *normalizedPlanMap } diff --git a/util/topsql/reporter/reporter_test.go b/util/topsql/reporter/reporter_test.go index df251165de02d..3b80d51041676 100644 --- a/util/topsql/reporter/reporter_test.go +++ b/util/topsql/reporter/reporter_test.go @@ -55,7 +55,7 @@ func populateCache(tsr *RemoteTopSQLReporter, begin, end int, timestamp uint64) CPUTimeMs: uint32(i + 1), }) } - tsr.Collect(timestamp, records) + tsr.processCPUTimeData(timestamp, records) // sleep a while for the asynchronous collect time.Sleep(100 * time.Millisecond) } @@ -126,13 +126,10 @@ func TestCollectAndSendBatch(t *testing.T) { require.NoError(t, err) id = n } - require.Len(t, req.RecordListCpuTimeMs, 1) - for i := range req.RecordListCpuTimeMs { - require.Equal(t, uint32(id), req.RecordListCpuTimeMs[i]) - } - require.Len(t, req.RecordListTimestampSec, 1) - for i := range req.RecordListTimestampSec { - require.Equal(t, uint64(1), req.RecordListTimestampSec[i]) + require.Len(t, req.Items, 1) + for i := range req.Items { + require.Equal(t, uint64(1), req.Items[i].TimestampSec) + require.Equal(t, uint32(id), req.Items[i].CpuTimeMs) } sqlMeta, exist := agentServer.GetSQLMetaByDigestBlocking(req.SqlDigest, time.Second) require.True(t, exist) @@ -168,19 +165,18 @@ func TestCollectAndEvicted(t *testing.T) { require.NoError(t, err) id = n } - require.Len(t, req.RecordListTimestampSec, 1) - require.Equal(t, uint64(2), req.RecordListTimestampSec[0]) - require.Len(t, req.RecordListCpuTimeMs, 1) + require.Len(t, req.Items, 1) + require.Equal(t, uint64(2), req.Items[0].TimestampSec) if id == 0 { // test for others require.Nil(t, req.SqlDigest) require.Nil(t, req.PlanDigest) // 12502500 is the sum of all evicted item's cpu time. 1 + 2 + 3 + ... + 5000 = (1 + 5000) * 2500 = 12502500 - require.Equal(t, 12502500, int(req.RecordListCpuTimeMs[0])) + require.Equal(t, 12502500, int(req.Items[0].CpuTimeMs)) continue } require.Greater(t, id, maxSQLNum) - require.Equal(t, uint32(id), req.RecordListCpuTimeMs[0]) + require.Equal(t, uint32(id), req.Items[0].CpuTimeMs) sqlMeta, exist := agentServer.GetSQLMetaByDigestBlocking(req.SqlDigest, time.Second) require.True(t, exist) require.Equal(t, "sqlNormalized"+strconv.Itoa(id), sqlMeta.NormalizedSql) @@ -207,7 +203,7 @@ func newSQLCPUTimeRecord(tsr *RemoteTopSQLReporter, sqlID int, cpuTimeMs uint32) } func collectAndWait(tsr *RemoteTopSQLReporter, timestamp uint64, records []tracecpu.SQLCPUTimeRecord) { - tsr.Collect(timestamp, records) + tsr.processCPUTimeData(timestamp, records) time.Sleep(time.Millisecond * 100) } @@ -262,17 +258,23 @@ func TestCollectAndTopN(t *testing.T) { sort.Slice(results, func(i, j int) bool { return string(results[i].SqlDigest) < string(results[j].SqlDigest) }) - getTotalCPUTime := func(record *tipb.CPUTimeRecord) int { + getTotalCPUTime := func(record *tipb.TopSQLRecord) int { total := uint32(0) - for _, v := range record.RecordListCpuTimeMs { - total += v + for _, i := range record.Items { + total += i.CpuTimeMs } return int(total) } require.Nil(t, results[0].SqlDigest) require.Equal(t, 5, getTotalCPUTime(results[0])) - require.Equal(t, []uint64{0, 1, 3, 4}, results[0].RecordListTimestampSec) - require.Equal(t, []uint32{1, 2, 1, 1}, results[0].RecordListCpuTimeMs) + require.Equal(t, uint64(0), results[0].Items[0].TimestampSec) + require.Equal(t, uint64(1), results[0].Items[1].TimestampSec) + require.Equal(t, uint64(3), results[0].Items[2].TimestampSec) + require.Equal(t, uint64(4), results[0].Items[3].TimestampSec) + require.Equal(t, uint32(1), results[0].Items[0].CpuTimeMs) + require.Equal(t, uint32(2), results[0].Items[1].CpuTimeMs) + require.Equal(t, uint32(1), results[0].Items[2].CpuTimeMs) + require.Equal(t, uint32(1), results[0].Items[3].CpuTimeMs) require.Equal(t, []byte("sqlDigest1"), results[1].SqlDigest) require.Equal(t, 5, getTotalCPUTime(results[1])) require.Equal(t, []byte("sqlDigest3"), results[2].SqlDigest) @@ -318,109 +320,26 @@ func TestCollectCapacity(t *testing.T) { variable.TopSQLVariable.MaxCollect.Store(10000) registerSQL(5000) - require.Equal(t, int64(5000), tsr.sqlMapLength.Load()) + require.Equal(t, int64(5000), tsr.normalizedSQLMap.length.Load()) registerPlan(1000) - require.Equal(t, int64(1000), tsr.planMapLength.Load()) + require.Equal(t, int64(1000), tsr.normalizedPlanMap.length.Load()) registerSQL(20000) - require.Equal(t, int64(10000), tsr.sqlMapLength.Load()) + require.Equal(t, int64(10000), tsr.normalizedSQLMap.length.Load()) registerPlan(20000) - require.Equal(t, int64(10000), tsr.planMapLength.Load()) + require.Equal(t, int64(10000), tsr.normalizedPlanMap.length.Load()) variable.TopSQLVariable.MaxCollect.Store(20000) registerSQL(50000) - require.Equal(t, int64(20000), tsr.sqlMapLength.Load()) + require.Equal(t, int64(20000), tsr.normalizedSQLMap.length.Load()) registerPlan(50000) - require.Equal(t, int64(20000), tsr.planMapLength.Load()) + require.Equal(t, int64(20000), tsr.normalizedPlanMap.length.Load()) variable.TopSQLVariable.MaxStatementCount.Store(5000) - collectedData := make(map[string]*dataPoints) - tsr.doCollect(collectedData, 1, genRecord(20000)) - require.Equal(t, 5001, len(collectedData)) - require.Equal(t, int64(20000), tsr.sqlMapLength.Load()) - require.Equal(t, int64(20000), tsr.planMapLength.Load()) -} - -func TestCollectOthers(t *testing.T) { - collectTarget := make(map[string]*dataPoints) - addEvictedCPUTime(collectTarget, 1, 10) - addEvictedCPUTime(collectTarget, 2, 20) - addEvictedCPUTime(collectTarget, 3, 30) - others := collectTarget[keyOthers] - require.Equal(t, uint64(60), others.CPUTimeMsTotal) - require.Equal(t, []uint64{1, 2, 3}, others.TimestampList) - require.Equal(t, []uint32{10, 20, 30}, others.CPUTimeMsList) - - others = addEvictedIntoSortedDataPoints(nil, others) - require.Equal(t, uint64(60), others.CPUTimeMsTotal) - - // test for time jump backward. - evict := &dataPoints{} - evict.TimestampList = []uint64{3, 2, 4} - evict.CPUTimeMsList = []uint32{30, 20, 40} - evict.CPUTimeMsTotal = 90 - others = addEvictedIntoSortedDataPoints(others, evict) - require.Equal(t, uint64(150), others.CPUTimeMsTotal) - require.Equal(t, []uint64{1, 2, 3, 4}, others.TimestampList) - require.Equal(t, []uint32{10, 40, 60, 40}, others.CPUTimeMsList) -} - -func TestDataPoints(t *testing.T) { - // test for dataPoints invalid. - d := &dataPoints{} - d.TimestampList = []uint64{1} - d.CPUTimeMsList = []uint32{10, 30} - require.True(t, d.isInvalid()) - - // test for dataPoints sort. - d = &dataPoints{} - d.TimestampList = []uint64{1, 2, 5, 6, 3, 4} - d.CPUTimeMsList = []uint32{10, 20, 50, 60, 30, 40} - sort.Sort(d) - require.Equal(t, []uint64{1, 2, 3, 4, 5, 6}, d.TimestampList) - require.Equal(t, []uint32{10, 20, 30, 40, 50, 60}, d.CPUTimeMsList) - - // test for dataPoints merge. - d = &dataPoints{} - evict := &dataPoints{} - addEvictedIntoSortedDataPoints(d, evict) - evict.TimestampList = []uint64{1, 3} - evict.CPUTimeMsList = []uint32{10, 30} - evict.CPUTimeMsTotal = 40 - addEvictedIntoSortedDataPoints(d, evict) - require.Equal(t, uint64(40), d.CPUTimeMsTotal) - require.Equal(t, []uint64{1, 3}, d.TimestampList) - require.Equal(t, []uint32{10, 30}, d.CPUTimeMsList) - - evict.TimestampList = []uint64{1, 2, 3, 4, 5} - evict.CPUTimeMsList = []uint32{10, 20, 30, 40, 50} - evict.CPUTimeMsTotal = 150 - addEvictedIntoSortedDataPoints(d, evict) - require.Equal(t, uint64(190), d.CPUTimeMsTotal) - require.Equal(t, []uint64{1, 2, 3, 4, 5}, d.TimestampList) - require.Equal(t, []uint32{20, 20, 60, 40, 50}, d.CPUTimeMsList) - - // test for time jump backward. - d = &dataPoints{} - evict = &dataPoints{} - evict.TimestampList = []uint64{3, 2} - evict.CPUTimeMsList = []uint32{30, 20} - evict.CPUTimeMsTotal = 50 - addEvictedIntoSortedDataPoints(d, evict) - require.Equal(t, uint64(50), d.CPUTimeMsTotal) - require.Equal(t, []uint64{2, 3}, d.TimestampList) - require.Equal(t, []uint32{20, 30}, d.CPUTimeMsList) - - // test for merge invalid dataPoints - d = &dataPoints{} - evict = &dataPoints{} - evict.TimestampList = []uint64{1} - evict.CPUTimeMsList = []uint32{10, 30} - require.True(t, evict.isInvalid()) - addEvictedIntoSortedDataPoints(d, evict) - require.False(t, d.isInvalid()) - require.Nil(t, d.CPUTimeMsList) - require.Nil(t, d.TimestampList) + tsr.processCPUTimeData(1, genRecord(20000)) + require.Equal(t, 5001, len(tsr.collecting.records)) + require.Equal(t, int64(20000), tsr.normalizedSQLMap.length.Load()) + require.Equal(t, int64(20000), tsr.normalizedPlanMap.length.Load()) } func TestCollectInternal(t *testing.T) { @@ -482,17 +401,17 @@ func TestMultipleDataSinks(t *testing.T) { records := []tracecpu.SQLCPUTimeRecord{ newSQLCPUTimeRecord(tsr, 1, 2), } - tsr.Collect(3, records) + tsr.processCPUTimeData(3, records) for _, ch := range chs { d := <-ch require.NotNil(t, d) - require.Equal(t, []tipb.CPUTimeRecord{{ - SqlDigest: []byte("sqlDigest1"), - PlanDigest: []byte("planDigest1"), - RecordListTimestampSec: []uint64{3}, - RecordListCpuTimeMs: []uint32{2}, - }}, d.CPUTimeRecords) + require.Len(t, d.DataRecords, 1) + require.Equal(t, []byte("sqlDigest1"), d.DataRecords[0].SqlDigest) + require.Equal(t, []byte("planDigest1"), d.DataRecords[0].PlanDigest) + require.Len(t, d.DataRecords[0].Items, 1) + require.Equal(t, uint64(3), d.DataRecords[0].Items[0].TimestampSec) + require.Equal(t, uint32(2), d.DataRecords[0].Items[0].CpuTimeMs) require.Equal(t, []tipb.SQLMeta{{ SqlDigest: []byte("sqlDigest1"), @@ -513,17 +432,17 @@ func TestMultipleDataSinks(t *testing.T) { records = []tracecpu.SQLCPUTimeRecord{ newSQLCPUTimeRecord(tsr, 4, 5), } - tsr.Collect(6, records) + tsr.processCPUTimeData(6, records) for i := 1; i < 7; i += 2 { d := <-chs[i] require.NotNil(t, d) - require.Equal(t, []tipb.CPUTimeRecord{{ - SqlDigest: []byte("sqlDigest4"), - PlanDigest: []byte("planDigest4"), - RecordListTimestampSec: []uint64{6}, - RecordListCpuTimeMs: []uint32{5}, - }}, d.CPUTimeRecords) + require.Len(t, d.DataRecords, 1) + require.Equal(t, []byte("sqlDigest4"), d.DataRecords[0].SqlDigest) + require.Equal(t, []byte("planDigest4"), d.DataRecords[0].PlanDigest) + require.Len(t, d.DataRecords[0].Items, 1) + require.Equal(t, uint64(6), d.DataRecords[0].Items[0].TimestampSec) + require.Equal(t, uint32(5), d.DataRecords[0].Items[0].CpuTimeMs) require.Equal(t, []tipb.SQLMeta{{ SqlDigest: []byte("sqlDigest4"), diff --git a/util/topsql/reporter/single_target.go b/util/topsql/reporter/single_target.go index 3744702ba26d6..05d0791dfe069 100644 --- a/util/topsql/reporter/single_target.go +++ b/util/topsql/reporter/single_target.go @@ -30,6 +30,12 @@ import ( "google.golang.org/grpc/backoff" ) +const ( + dialTimeout = 5 * time.Second + grpcInitialWindowSize = 1 << 30 + grpcInitialConnWindowSize = 1 << 30 +) + // SingleTargetDataSink reports data to grpc servers. type SingleTargetDataSink struct { ctx context.Context @@ -207,7 +213,7 @@ func (ds *SingleTargetDataSink) doSend(addr string, task sendTask) { }() go func() { defer wg.Done() - errCh <- ds.sendBatchCPUTimeRecord(ctx, task.data.CPUTimeRecords) + errCh <- ds.sendBatchTopSQLRecord(ctx, task.data.DataRecords) }() wg.Wait() close(errCh) @@ -218,8 +224,8 @@ func (ds *SingleTargetDataSink) doSend(addr string, task sendTask) { } } -// sendBatchCPUTimeRecord sends a batch of TopSQL records by stream. -func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, records []tipb.CPUTimeRecord) (err error) { +// sendBatchTopSQLRecord sends a batch of TopSQL records by stream. +func (ds *SingleTargetDataSink) sendBatchTopSQLRecord(ctx context.Context, records []tipb.TopSQLRecord) (err error) { if len(records) == 0 { return nil } @@ -236,7 +242,7 @@ func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, reco }() client := tipb.NewTopSQLAgentClient(ds.conn) - stream, err := client.ReportCPUTimeRecords(ctx) + stream, err := client.ReportTopSQLRecords(ctx) if err != nil { return err } diff --git a/util/topsql/stmtstats/aggregator.go b/util/topsql/stmtstats/aggregator.go index d78ed7b62dafb..648b00dd16948 100644 --- a/util/topsql/stmtstats/aggregator.go +++ b/util/topsql/stmtstats/aggregator.go @@ -25,12 +25,6 @@ import ( // globalAggregator is global *aggregator. var globalAggregator = newAggregator() -// StatementStatsRecord is the merged StatementStatsMap with timestamp. -type StatementStatsRecord struct { - Timestamp int64 - Data StatementStatsMap -} - // aggregator is used to collect and aggregate data from all StatementStats. // It is responsible for collecting data from all StatementStats, aggregating // them together, uploading them and regularly cleaning up the closed StatementStats. @@ -69,22 +63,21 @@ func (m *aggregator) run() { // aggregate data from all associated StatementStats. // If StatementStats has been closed, collect will remove it from the map. func (m *aggregator) aggregate() { - r := StatementStatsRecord{ - Timestamp: time.Now().Unix(), - Data: StatementStatsMap{}, - } + total := StatementStatsMap{} m.statsSet.Range(func(statsR, _ interface{}) bool { stats := statsR.(*StatementStats) if stats.Finished() { m.unregister(stats) } - r.Data.Merge(stats.Take()) - return true - }) - m.collectors.Range(func(c, _ interface{}) bool { - c.(Collector).CollectStmtStatsRecords([]StatementStatsRecord{r}) + total.Merge(stats.Take()) return true }) + if len(total) > 0 { + m.collectors.Range(func(c, _ interface{}) bool { + c.(Collector).CollectStmtStatsMap(total) + return true + }) + } } // register binds StatementStats to aggregator. @@ -149,8 +142,8 @@ func UnregisterCollector(collector Collector) { globalAggregator.unregisterCollector(collector) } -// Collector is used to collect StatementStatsRecord. +// Collector is used to collect StatementStatsMap. type Collector interface { - // CollectStmtStatsRecords is used to collect list of StatementStatsRecord. - CollectStmtStatsRecords([]StatementStatsRecord) + // CollectStmtStatsMap is used to collect StatementStatsMap. + CollectStmtStatsMap(StatementStatsMap) } diff --git a/util/topsql/stmtstats/aggregator_test.go b/util/topsql/stmtstats/aggregator_test.go index 24a72bb89131d..61e7dbf07ca33 100644 --- a/util/topsql/stmtstats/aggregator_test.go +++ b/util/topsql/stmtstats/aggregator_test.go @@ -38,7 +38,7 @@ func Test_RegisterUnregisterCollector(t *testing.T) { SetupAggregator() defer CloseAggregator() time.Sleep(100 * time.Millisecond) - collector := newMockCollector(func(records []StatementStatsRecord) {}) + collector := newMockCollector(func(data StatementStatsMap) {}) RegisterCollector(collector) _, ok := globalAggregator.collectors.Load(collector) assert.True(t, ok) @@ -55,13 +55,13 @@ func Test_aggregator_register_collect(t *testing.T) { } a.register(stats) stats.OnExecutionBegin([]byte("SQL-1"), []byte("")) - var records []StatementStatsRecord - a.registerCollector(newMockCollector(func(rs []StatementStatsRecord) { - records = append(records, rs...) + total := StatementStatsMap{} + a.registerCollector(newMockCollector(func(data StatementStatsMap) { + total.Merge(data) })) a.aggregate() - assert.NotEmpty(t, records) - assert.Equal(t, uint64(1), records[0].Data[SQLPlanDigest{SQLDigest: "SQL-1"}].ExecCount) + assert.NotEmpty(t, total) + assert.Equal(t, uint64(1), total[SQLPlanDigest{SQLDigest: "SQL-1"}].ExecCount) } func Test_aggregator_run_close(t *testing.T) { @@ -81,13 +81,13 @@ func Test_aggregator_run_close(t *testing.T) { } type mockCollector struct { - f func(records []StatementStatsRecord) + f func(data StatementStatsMap) } -func newMockCollector(f func(records []StatementStatsRecord)) Collector { +func newMockCollector(f func(data StatementStatsMap)) Collector { return &mockCollector{f: f} } -func (c *mockCollector) CollectStmtStatsRecords(records []StatementStatsRecord) { - c.f(records) +func (c *mockCollector) CollectStmtStatsMap(data StatementStatsMap) { + c.f(data) } diff --git a/util/topsql/stmtstats/stmtstatstest/stmtstats_test.go b/util/topsql/stmtstats/stmtstatstest/stmtstats_test.go index d37de52178e0e..09f830a0ec9a3 100644 --- a/util/topsql/stmtstats/stmtstatstest/stmtstats_test.go +++ b/util/topsql/stmtstats/stmtstatstest/stmtstats_test.go @@ -39,12 +39,10 @@ func TestExecCount(t *testing.T) { // Register stmt stats collector. var mu sync.Mutex total := stmtstats.StatementStatsMap{} - stmtstats.RegisterCollector(newMockCollector(func(rs []stmtstats.StatementStatsRecord) { + stmtstats.RegisterCollector(newMockCollector(func(data stmtstats.StatementStatsMap) { mu.Lock() defer mu.Unlock() - for _, r := range rs { - total.Merge(r.Data) - } + total.Merge(data) })) // Create mock store. @@ -138,13 +136,13 @@ func TestExecCount(t *testing.T) { } type mockCollector struct { - f func(records []stmtstats.StatementStatsRecord) + f func(data stmtstats.StatementStatsMap) } -func newMockCollector(f func(records []stmtstats.StatementStatsRecord)) stmtstats.Collector { +func newMockCollector(f func(data stmtstats.StatementStatsMap)) stmtstats.Collector { return &mockCollector{f: f} } -func (c *mockCollector) CollectStmtStatsRecords(records []stmtstats.StatementStatsRecord) { - c.f(records) +func (c *mockCollector) CollectStmtStatsMap(data stmtstats.StatementStatsMap) { + c.f(data) } diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go index ccc1f61eef0a5..cb1199a00c02a 100644 --- a/util/topsql/topsql.go +++ b/util/topsql/topsql.go @@ -53,6 +53,7 @@ func SetupTopSQL() { tracecpu.GlobalSQLCPUProfiler.SetCollector(remoteReporter) tracecpu.GlobalSQLCPUProfiler.Run() + stmtstats.RegisterCollector(remoteReporter) stmtstats.SetupAggregator() } diff --git a/util/topsql/topsql_test.go b/util/topsql/topsql_test.go index 462c6ffb70aa2..a8d589f073df1 100644 --- a/util/topsql/topsql_test.go +++ b/util/topsql/topsql_test.go @@ -162,8 +162,8 @@ func TestTopSQLReporter(t *testing.T) { records := server.GetLatestRecords() checkSQLPlanMap := map[string]struct{}{} for _, req := range records { - require.Greater(t, len(req.RecordListCpuTimeMs), 0) - require.Greater(t, req.RecordListCpuTimeMs[0], uint32(0)) + require.Greater(t, len(req.Items), 0) + require.Greater(t, req.Items[0].CpuTimeMs, uint32(0)) sqlMeta, exist := server.GetSQLMetaByDigestBlocking(req.SqlDigest, time.Second) require.True(t, exist) expectedNormalizedSQL, exist := sqlMap[string(req.SqlDigest)] @@ -279,7 +279,7 @@ func TestTopSQLPubSub(t *testing.T) { sqlMetas := make(map[string]*tipb.SQLMeta) planMetas := make(map[string]string) - records := make(map[string]*tipb.CPUTimeRecord) + records := make(map[string]*tipb.TopSQLRecord) for { r, err := stream.Recv() @@ -292,12 +292,11 @@ func TestTopSQLPubSub(t *testing.T) { if _, ok := records[string(rec.SqlDigest)]; !ok { records[string(rec.SqlDigest)] = rec } else { - cpu := records[string(rec.SqlDigest)] + record := records[string(rec.SqlDigest)] if rec.PlanDigest != nil { - cpu.PlanDigest = rec.PlanDigest + record.PlanDigest = rec.PlanDigest } - cpu.RecordListTimestampSec = append(cpu.RecordListTimestampSec, rec.RecordListTimestampSec...) - cpu.RecordListCpuTimeMs = append(cpu.RecordListCpuTimeMs, rec.RecordListCpuTimeMs...) + record.Items = append(record.Items, rec.Items...) } } else if r.GetSqlMeta() != nil { sql := r.GetSqlMeta() @@ -315,8 +314,8 @@ func TestTopSQLPubSub(t *testing.T) { checkSQLPlanMap := map[string]struct{}{} for i := range records { record := records[i] - require.Greater(t, len(record.RecordListCpuTimeMs), 0) - require.Greater(t, record.RecordListCpuTimeMs[0], uint32(0)) + require.Greater(t, len(record.Items), 0) + require.Greater(t, record.Items[0].CpuTimeMs, uint32(0)) sqlMeta, exist := sqlMetas[string(record.SqlDigest)] require.True(t, exist) expectedNormalizedSQL, exist := digest2sql[string(record.SqlDigest)] diff --git a/util/topsql/tracecpu/mock/mock.go b/util/topsql/tracecpu/mock/mock.go index 62125d202f67c..3d240e588008d 100644 --- a/util/topsql/tracecpu/mock/mock.go +++ b/util/topsql/tracecpu/mock/mock.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/topsql/stmtstats" "github.com/pingcap/tidb/util/topsql/tracecpu" "go.uber.org/atomic" "go.uber.org/zap" @@ -49,7 +50,7 @@ func NewTopSQLCollector() *TopSQLCollector { } // Collect uses for testing. -func (c *TopSQLCollector) Collect(ts uint64, stats []tracecpu.SQLCPUTimeRecord) { +func (c *TopSQLCollector) Collect(stats []tracecpu.SQLCPUTimeRecord) { defer c.collectCnt.Inc() if len(stats) == 0 { return @@ -73,6 +74,9 @@ func (c *TopSQLCollector) Collect(ts uint64, stats []tracecpu.SQLCPUTimeRecord) } } +// CollectStmtStatsMap implements stmtstats.Collector. +func (c *TopSQLCollector) CollectStmtStatsMap(_ stmtstats.StatementStatsMap) {} + // GetSQLStatsBySQLWithRetry uses for testing. func (c *TopSQLCollector) GetSQLStatsBySQLWithRetry(sql string, planIsNotNull bool) []*tracecpu.SQLCPUTimeRecord { after := time.After(time.Second * 10) diff --git a/util/topsql/tracecpu/profile.go b/util/topsql/tracecpu/profile.go index b08569d147eec..7b5063419146a 100644 --- a/util/topsql/tracecpu/profile.go +++ b/util/topsql/tracecpu/profile.go @@ -47,8 +47,7 @@ var GlobalSQLCPUProfiler = newSQLCPUProfiler() // Collector uses to collect SQL execution cpu time. type Collector interface { // Collect uses to collect the SQL execution cpu time. - // ts is a Unix time, unit is second. - Collect(ts uint64, stats []SQLCPUTimeRecord) + Collect(stats []SQLCPUTimeRecord) } // SQLCPUTimeRecord represents a single record of how much cpu time a sql plan consumes in one second. @@ -129,10 +128,6 @@ func (sp *sqlCPUProfiler) doCPUProfile() { ns := int64(time.Second)*intervalSecond - int64(time.Now().Nanosecond()) time.Sleep(time.Nanosecond * time.Duration(ns)) pprof.StopCPUProfile() - task.end = time.Now().Unix() - if task.end < 0 { - task.end = 0 - } sp.taskCh <- task } @@ -149,7 +144,7 @@ func (sp *sqlCPUProfiler) startAnalyzeProfileWorker() { stats := sp.parseCPUProfileBySQLLabels(p) sp.handleExportProfileTask(p) if c := sp.GetCollector(); c != nil { - c.Collect(uint64(task.end), stats) + c.Collect(stats) } sp.putTaskToBuffer(task) } @@ -157,7 +152,6 @@ func (sp *sqlCPUProfiler) startAnalyzeProfileWorker() { type profileData struct { buf *bytes.Buffer - end int64 } func (sp *sqlCPUProfiler) newProfileTask() *profileData {