Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_control: support dynamic calibrate resource #43098

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ go_library(
"//parser/ast",
"//parser/auth",
"//parser/charset",
"//parser/duration",
"//parser/format",
"//parser/model",
"//parser/mysql",
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
return &calibrateResourceExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), 0),
workloadType: s.Tp,
optionList: s.DynamicCalibrateResourceOptionList,
}
case *ast.LoadDataActionStmt:
return &LoadDataActionExec{
Expand Down
194 changes: 192 additions & 2 deletions executor/calibrate_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@ package executor

import (
"context"
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/duration"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/oracle"
)

// workloadBaseRUCostMap contains the base resource cost rate per 1 kv cpu within 1 second,
Expand Down Expand Up @@ -83,13 +91,83 @@ type baseResourceCost struct {
writeReqCount uint64
}

const (
valuableUsageThreshold = 0.2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments for these constants

lowUsageThreshold = 0.1
percentOfPass = 0.9
discardRate = 0.1

maxDuration = time.Hour * 24
minDuration = time.Minute * 10
)

type calibrateResourceExec struct {
baseExecutor

optionList []*ast.DynamicCalibrateResourceOption
workloadType ast.CalibrateResourceType
done bool
}

func (e *calibrateResourceExec) checkDynamicCalibrateOptions() (string, string, error) {
var startTime, endTime time.Time
var startTimeStr, endTimeStr string
checkDurationFn := func() error {
// check the duration
duration := endTime.Sub(startTime)
if duration > maxDuration {
return errors.Errorf("the duration of calibration is too long")
}
if duration < minDuration {
return errors.Errorf("the duration of calibration is too short")
}
return nil
}

if len(e.optionList) == 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to replace this if-else with something like following:

var start, end, dur *ptr
for _, op := range e.optionList[0] {
  ...
}
if duration == nil { ...default for duration}
if start == nil { ...default-for-start }
if end == nil { ...default-for-end }

validate_start_end_duration()

...rest logics

And Since The static and dynamic branch have little common logic with each other, please wrap both of them with a separate function to avoid too long if..else.. block

if e.optionList[0].Tp != ast.CalibrateStartTime || (e.optionList[1].Tp != ast.CalibrateEndTime && e.optionList[1].Tp != ast.CalibrateDuration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.optionList[1].Tp != ast.CalibrateEndTime && e.optionList[1].Tp != ast.CalibrateDuration
I'm not sure why the same parameter would have && judgment twice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we're trying to determine if it's wrong. The expression to assert truth is e.optionList[1].Tp == ast.CalibrateEndTime || e.optionList[1].Tp == ast.CalibrateDuration

return "", "", errors.Errorf("dynamic calibarate options are not matched, please input start time and end time or duration is optional")
}
// check start time and end time whether validated
startTs, err := staleread.CalculateAsOfTsExpr(e.ctx, e.optionList[0].Ts)
if err != nil {
return "", "", err
}
startTime = oracle.GetTimeFromTS(startTs)
if e.optionList[1].Tp == ast.CalibrateEndTime {
endTs, err := staleread.CalculateAsOfTsExpr(e.ctx, e.optionList[1].Ts)
if err != nil {
return "", "", err
}
endTime = oracle.GetTimeFromTS(endTs)
} else {
duration, err := duration.ParseDuration(e.optionList[1].StrValue)
if err != nil {
return "", "", err
}
endTime = startTime.Add(duration)
}
} else if len(e.optionList) == 1 {
if e.optionList[0].Tp != ast.CalibrateStartTime {
return "", "", errors.Errorf("dynamic calibarate options are not matched, please input start time and end time is optional")
}
// check start time whether validated
startTs, err := staleread.CalculateAsOfTsExpr(e.ctx, e.optionList[0].Ts)
if err != nil {
return "", "", err
}
startTime = oracle.GetTimeFromTS(startTs)
endTime = time.Now()
} else {
return "", "", errors.Errorf("dynamic calibarate options are too much")
}
startTimeStr = startTime.In(e.ctx.GetSessionVars().Location()).Format("2006-01-02 15:04:05")
endTimeStr = endTime.In(e.ctx.GetSessionVars().Location()).Format("2006-01-02 15:04:05")
if err := checkDurationFn(); err != nil {
return "", "", err
}
return startTimeStr, endTimeStr, nil
}

func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.done {
Expand All @@ -99,7 +177,95 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro

exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
if len(e.optionList) > 0 && e.workloadType != ast.WorkloadNone {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe can put this check in dynamicCalibrate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update it

return errors.Errorf("dynamic and static calibration cannot be performed at the same time")
}
if len(e.optionList) > 0 {
return e.dynamicCalibrate(ctx, req, exec)
}
return e.staticCalibrate(ctx, req, exec)
}

func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
startTime, endTime, err := e.checkDynamicCalibrateOptions()
if err != nil {
return err
}
totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
if err != nil {
return err
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
if err != nil {
return err
}
rus, err := getRUPerSec(ctx, exec, startTime, endTime)
if err != nil {
return err
}
tikvCPUs, err := getComponentCPUUsagePerSec(ctx, exec, "tikv", startTime, endTime)
if err != nil {
return err
}
tidbCPUs, err := getComponentCPUUsagePerSec(ctx, exec, "tidb", startTime, endTime)
if err != nil {
return err
}
quotas := make([]float64, 0)
lowCount := 0
tidbCPULowCount := 0
tikvCPULowCOunt := 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe can reduce the initial number of rows

Suggested change
lowCount := 0
tidbCPULowCount := 0
tikvCPULowCOunt := 0
lowCount, tidbCPULowCount, tikvCPULowCount := 0, 0, 0

BTW, tikvCPULowCOunt has a extra big O

for idx, ru := range rus {
if idx >= len(tikvCPUs) || idx >= len(tidbCPUs) {
break
}
tikvQuota := tikvCPUs[idx] / totalKVCPUQuota
tidbQuota := tidbCPUs[idx] / totalTiDBCPU
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// If one of the two cpu usage is greater than the `valuableUsageThreshold`, we can accept it.
// And if both are greater than the `lowUsageThreshold`, we can also accpet it.
if tikvQuota > valuableUsageThreshold || tidbQuota > valuableUsageThreshold {
quotas = append(quotas, ru/mathutil.Max(tikvQuota, tidbQuota))
} else if tikvQuota < lowUsageThreshold {
lowCount++
tikvCPULowCOunt++
if tidbQuota < lowUsageThreshold {
tidbCPULowCount++
}
} else if tidbQuota < lowUsageThreshold {
lowCount++
tidbCPULowCount++
} else {
quotas = append(quotas, ru/mathutil.Max(tikvQuota, tidbQuota))
}
}
if len(quotas) < 5 {
return errors.Errorf("there are too few metrics points available")
}
if float64(len(quotas))/float64(len(quotas)+lowCount) > percentOfPass {
sort.Slice(quotas, func(i, j int) bool {
return quotas[i] > quotas[j]
})
lowerBound := int(math.Round(float64(len(quotas)) * float64(discardRate)))
upperBound := len(quotas) - lowerBound
sum := 0.
for i := lowerBound; i < upperBound; i++ {
sum += quotas[i]
}
quota := sum / float64(upperBound-lowerBound)
req.AppendUint64(0, uint64(quota))
} else {
if tidbCPULowCount > 0 && tikvCPULowCOunt > 0 {
return errors.Errorf("The CPU utilizations of TiDB and TiKV are less than one tenth in some of the time")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest msg
"The workload in selected time window is too low, with which TiDB is unable to reach a capacity estimation. Please select another time window with higher workload, or calibrate resource by hardware instead. "

} else if tidbCPULowCount > 0 {
return errors.Errorf("The CPU utilization of TiDB is less than one tenth in some of the time")
} else {
return errors.Errorf("The CPU utilization of TiKV is less than one tenth in some of the time")
}
}
return nil
}

func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
// first fetch the ru settings config.
ruCfg, err := getRUSettings(ctx, exec)
if err != nil {
Expand Down Expand Up @@ -134,7 +300,6 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro
ruCfg.writeCostPerByte*float64(baseCost.writeBytes)
quota := totalKVCPUQuota * ruPerKVCPU
req.AppendUint64(0, uint64(quota))

return nil
}

Expand Down Expand Up @@ -199,6 +364,16 @@ func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecuto
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs")
}

func getRUPerSec(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) ([]float64, error) {
query := fmt.Sprintf("SELECT value FROM METRICS_SCHEMA.resource_manager_resource_unit where time >= '%s' and time <= '%s' ORDER BY time desc", startTime, endTime)
return getValuesFromMetrics(ctx, exec, query, "resource_manager_resource_unit")
}

func getComponentCPUUsagePerSec(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, component, startTime, endTime string) ([]float64, error) {
query := fmt.Sprintf("SELECT sum(value) FROM METRICS_SCHEMA.process_cpu_usage where time >= '%s' and time <= '%s' and job like '%%%s' GROUP BY time ORDER BY time desc", startTime, endTime, component)
return getValuesFromMetrics(ctx, exec, query, "process_cpu_usage")
}

func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (float64, error) {
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query)
if err != nil {
Expand All @@ -210,3 +385,18 @@ func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecuto

return rows[0].GetFloat64(0), nil
}

func getValuesFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) ([]float64, error) {
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query)
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, errors.Errorf("metrics '%s' is empty", metrics)
}
ret := make([]float64, 0, len(rows))
for _, row := range rows {
ret = append(ret, row.GetFloat64(0))
}
return ret, nil
}
Loading