Skip to content

Commit

Permalink
topsql: introduce datasink interface (#30662)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongzc authored Dec 14, 2021
1 parent c30d34f commit e9b1fb8
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 75 deletions.
2 changes: 1 addition & 1 deletion server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ func TestTopSQLAgent(t *testing.T) {
dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;")
dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;")

r := reporter.NewRemoteTopSQLReporter(reporter.NewGRPCReportClient(plancodec.DecodeNormalizedPlan))
r := reporter.NewRemoteTopSQLReporter(reporter.NewSingleTargetDataSink(plancodec.DecodeNormalizedPlan))
tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{r})

// TODO: change to ensure that the right sql statements are reported, not just counts
Expand Down
36 changes: 36 additions & 0 deletions util/topsql/reporter/datasink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reporter

import "time"

// DataSink collects and sends data to a target.
type DataSink interface {
// TrySend pushes a report data into the sink, which will later be sent to a target by the sink. A deadline can be
// specified to control how late it should be sent. If the sink is kept full and cannot schedule a send within
// the specified deadline, or the sink is closed, an error will be returned.
TrySend(data ReportData, deadline time.Time) error

// IsPaused indicates that the DataSink is not expecting to receive records for now
// and may resume in the future.
IsPaused() bool

// IsDown indicates that the DataSink has been down and can be cleared.
// Note that: once a DataSink is down, it cannot go back to be up.
IsDown() bool

// Close cleans up resources owned by this DataSink
Close()
}
40 changes: 16 additions & 24 deletions util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -118,9 +117,9 @@ type planBinaryDecodeFunc func(string) (string, error)
// RemoteTopSQLReporter implements a TopSQL reporter that sends data to a remote agent
// This should be called periodically to collect TopSQL resource usage metrics
type RemoteTopSQLReporter struct {
ctx context.Context
cancel context.CancelFunc
client ReportClient
ctx context.Context
cancel context.CancelFunc
dataSink DataSink

// normalizedSQLMap is an map, whose keys are SQL digest strings and values are SQLMeta.
normalizedSQLMap atomic.Value // sync.Map
Expand All @@ -145,12 +144,12 @@ type SQLMeta struct {
//
// planBinaryDecoder is a decoding function which will be called asynchronously to decode the plan binary to string
// MaxStatementsNum is the maximum SQL and plan number, which will restrict the memory usage of the internal LFU cache
func NewRemoteTopSQLReporter(client ReportClient) *RemoteTopSQLReporter {
func NewRemoteTopSQLReporter(dataSink DataSink) *RemoteTopSQLReporter {
ctx, cancel := context.WithCancel(context.Background())
tsr := &RemoteTopSQLReporter{
ctx: ctx,
cancel: cancel,
client: client,
dataSink: dataSink,
collectCPUDataChan: make(chan cpuData, 1),
reportCollectedDataChan: make(chan collectedData, 1),
}
Expand Down Expand Up @@ -238,7 +237,7 @@ func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQ
// Close uses to close and release the reporter resource.
func (tsr *RemoteTopSQLReporter) Close() {
tsr.cancel()
tsr.client.Close()
tsr.dataSink.Close()
}

func addEvictedCPUTime(collectTarget map[string]*dataPoints, timestamp uint64, totalCPUTimeMs uint32) {
Expand Down Expand Up @@ -450,15 +449,15 @@ type collectedData struct {
normalizedPlanMap *sync.Map
}

// reportData contains data that reporter sends to the agent
type reportData struct {
// ReportData contains data that reporter sends to the agent
type ReportData struct {
// collectedData contains the topN collected records and the `others` record which aggregation all records that is out of Top N.
collectedData []*dataPoints
normalizedSQLMap *sync.Map
normalizedPlanMap *sync.Map
}

func (d *reportData) hasData() bool {
func (d *ReportData) hasData() bool {
if len(d.collectedData) > 0 {
return true
}
Expand Down Expand Up @@ -496,9 +495,9 @@ func (tsr *RemoteTopSQLReporter) reportWorker() {
}
}

// getReportData gets reportData from the collectedData.
// getReportData gets ReportData from the collectedData.
// This function will calculate the topN collected records and the `others` record which aggregation all records that is out of Top N.
func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) reportData {
func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportData {
// Fetch TopN dataPoints.
others := collected.records[keyOthers]
delete(collected.records, keyOthers)
Expand All @@ -524,21 +523,20 @@ func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) reportDa
records = append(records, others)
}

return reportData{
return ReportData{
collectedData: records,
normalizedSQLMap: collected.normalizedSQLMap,
normalizedPlanMap: collected.normalizedPlanMap,
}
}

func (tsr *RemoteTopSQLReporter) doReport(data reportData) {
func (tsr *RemoteTopSQLReporter) doReport(data ReportData) {
defer util.Recover("top-sql", "doReport", nil, false)

if !data.hasData() {
return
}

agentAddr := config.GetGlobalConfig().TopSQL.ReceiverAddress
timeout := reportTimeout
failpoint.Inject("resetTimeoutForTest", func(val failpoint.Value) {
if val.(bool) {
Expand All @@ -548,14 +546,8 @@ func (tsr *RemoteTopSQLReporter) doReport(data reportData) {
}
}
})
ctx, cancel := context.WithTimeout(tsr.ctx, timeout)
start := time.Now()
err := tsr.client.Send(ctx, agentAddr, data)
if err != nil {
logutil.BgLogger().Warn("[top-sql] client failed to send data", zap.Error(err))
reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds())
} else {
reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds())
deadline := time.Now().Add(timeout)
if err := tsr.dataSink.TrySend(data, deadline); err != nil {
logutil.BgLogger().Warn("[top-sql] failed to send data to datasink", zap.Error(err))
}
cancel()
}
2 changes: 1 addition & 1 deletion util/topsql/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) *Rem
conf.TopSQL.ReceiverAddress = addr
})

rc := NewGRPCReportClient(mockPlanBinaryDecoderFunc)
rc := NewSingleTargetDataSink(mockPlanBinaryDecoderFunc)
ts := NewRemoteTopSQLReporter(rc)
return ts
}
Expand Down
Loading

0 comments on commit e9b1fb8

Please sign in to comment.