Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support time function for calibrate resource #44965

Merged
merged 13 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"bind.go",
"brie.go",
"builder.go",
"calibrate_resource.go",
"change.go",
"checksum.go",
"compact_table.go",
Expand Down Expand Up @@ -121,6 +120,7 @@ go_library(
"//executor/asyncloaddata",
"//executor/importer",
"//executor/internal/builder",
"//executor/internal/calibrateresource",
"//executor/internal/exec",
"//executor/internal/mpp",
"//executor/internal/pdhelper",
Expand All @@ -139,7 +139,6 @@ go_library(
"//parser/ast",
"//parser/auth",
"//parser/charset",
"//parser/duration",
"//parser/format",
"//parser/model",
"//parser/mysql",
Expand Down Expand Up @@ -290,7 +289,6 @@ go_test(
"batch_point_get_test.go",
"benchmark_test.go",
"brie_test.go",
"calibrate_resource_test.go",
"charset_test.go",
"chunk_size_control_test.go",
"cluster_table_test.go",
Expand Down Expand Up @@ -476,8 +474,6 @@ go_test(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//resource_group/controller",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
Expand Down
7 changes: 4 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/executor/internal/builder"
"github.com/pingcap/tidb/executor/internal/calibrateresource"
"github.com/pingcap/tidb/executor/internal/exec"
"github.com/pingcap/tidb/executor/internal/pdhelper"
executor_metrics "github.com/pingcap/tidb/executor/metrics"
Expand Down Expand Up @@ -907,10 +908,10 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) exec.Executor {
}
}
case *ast.CalibrateResourceStmt:
return &calibrateResourceExec{
return &calibrateresource.Executor{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), 0),
workloadType: s.Tp,
optionList: s.DynamicCalibrateResourceOptionList,
WorkloadType: s.Tp,
OptionList: s.DynamicCalibrateResourceOptionList,
}
case *ast.LoadDataActionStmt:
return &LoadDataActionExec{
Expand Down
45 changes: 45 additions & 0 deletions executor/internal/calibrateresource/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "calibrateresource",
srcs = ["calibrate_resource.go"],
importpath = "github.com/pingcap/tidb/executor/internal/calibrateresource",
visibility = ["//executor:__subpackages__"],
deps = [
"//domain",
"//executor/internal/exec",
"//infoschema",
"//kv",
"//parser/ast",
"//parser/duration",
"//parser/model",
"//sessionctx",
"//sessionctx/variable",
"//sessiontxn/staleread",
"//util/chunk",
"//util/mathutil",
"//util/sqlexec",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//oracle",
],
)

go_test(
name = "calibrateresource_test",
timeout = "short",
srcs = ["calibrate_resource_test.go"],
flaky = True,
deps = [
"//domain",
"//parser/mysql",
"//testkit",
"//types",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//resource_group/controller",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package executor
package calibrateresource

import (
"context"
Expand All @@ -23,12 +23,14 @@ import (

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/internal/exec"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/duration"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn/staleread"
Expand Down Expand Up @@ -113,48 +115,106 @@ const (
minDuration = time.Minute * 10
)

type calibrateResourceExec struct {
// Executor is used as executor of calibrate resource.
type Executor struct {
exec.BaseExecutor
optionList []*ast.DynamicCalibrateResourceOption
workloadType ast.CalibrateResourceType
OptionList []*ast.DynamicCalibrateResourceOption
WorkloadType ast.CalibrateResourceType
done bool
}

func (e *calibrateResourceExec) parseCalibrateDuration(ctx context.Context) (startTime time.Time, endTime time.Time, err error) {
func (e *Executor) parseTsExpr(ctx context.Context, tsExpr ast.ExprNode) (time.Time, error) {
ts, err := staleread.CalculateAsOfTsExpr(ctx, e.Ctx(), tsExpr)
if err != nil {
return time.Time{}, err
}
return oracle.GetTimeFromTS(ts), nil
}

func (e *Executor) parseCalibrateDuration(ctx context.Context) (startTime time.Time, endTime time.Time, err error) {
var dur time.Duration
var ts uint64
for _, op := range e.optionList {
// startTimeExpr and endTimeExpr are used to calc endTime by FuncCallExpr when duration begin with `interval`.
var startTimeExpr ast.ExprNode
var endTimeExpr ast.ExprNode
for _, op := range e.OptionList {
switch op.Tp {
case ast.CalibrateStartTime:
ts, err = staleread.CalculateAsOfTsExpr(ctx, e.Ctx(), op.Ts)
startTimeExpr = op.Ts
startTime, err = e.parseTsExpr(ctx, startTimeExpr)
if err != nil {
return
}
startTime = oracle.GetTimeFromTS(ts)
case ast.CalibrateEndTime:
ts, err = staleread.CalculateAsOfTsExpr(ctx, e.Ctx(), op.Ts)
endTimeExpr = op.Ts
endTime, err = e.parseTsExpr(ctx, op.Ts)
if err != nil {
return
}
endTime = oracle.GetTimeFromTS(ts)
case ast.CalibrateDuration:
}
}
for _, op := range e.OptionList {
if op.Tp != ast.CalibrateDuration {
continue
}
// string duration
if len(op.StrValue) > 0 {
dur, err = duration.ParseDuration(op.StrValue)
if err != nil {
return
}
// If startTime is not set, startTime will be now() - duration.
if startTime.IsZero() {
toTime := endTime
if toTime.IsZero() {
toTime = time.Now()
}
startTime = toTime.Add(-dur)
}
// If endTime is set, duration will be ignored.
if endTime.IsZero() {
endTime = startTime.Add(dur)
}
continue
}
// interval duration
// If startTime is not set, startTime will be now() - duration.
if startTimeExpr == nil {
toTimeExpr := endTimeExpr
if endTime.IsZero() {
toTimeExpr = &ast.FuncCallExpr{FnName: model.NewCIStr("CURRENT_TIMESTAMP")}
}
startTimeExpr = &ast.FuncCallExpr{
FnName: model.NewCIStr("DATE_SUB"),
Args: []ast.ExprNode{
toTimeExpr,
op.Ts,
&ast.TimeUnitExpr{Unit: op.Unit}},
}
startTime, err = e.parseTsExpr(ctx, startTimeExpr)
if err != nil {
return
}
}
// If endTime is set, duration will be ignored.
if endTime.IsZero() {
endTime, err = e.parseTsExpr(ctx, &ast.FuncCallExpr{
FnName: model.NewCIStr("DATE_ADD"),
Args: []ast.ExprNode{startTimeExpr,
op.Ts,
&ast.TimeUnitExpr{Unit: op.Unit}},
})
if err != nil {
return
}
}
}

if startTime.IsZero() {
err = errors.Errorf("start time should not be 0")
return
}
// If endTime is set, duration will be ignored.
if endTime.IsZero() {
if dur != time.Duration(0) {
endTime = startTime.Add(dur)
} else {
endTime = time.Now()
}
endTime = time.Now()
}
// check the duration
dur = endTime.Sub(startTime)
Expand All @@ -165,11 +225,11 @@ func (e *calibrateResourceExec) parseCalibrateDuration(ctx context.Context) (sta
if dur < minDuration {
err = errors.Errorf("the duration of calibration is too short, which could lead to inaccurate output. Please make the duration between %s and %s", minDuration.String(), maxDuration.String())
}

return
}

func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) error {
// Next implements the interface of Executor.
func (e *Executor) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.done {
return nil
Expand All @@ -178,7 +238,7 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro

exec := e.Ctx().(sqlexec.RestrictedSQLExecutor)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
if len(e.optionList) > 0 {
if len(e.OptionList) > 0 {
return e.dynamicCalibrate(ctx, req, exec)
}
return e.staticCalibrate(ctx, req, exec)
Expand All @@ -189,7 +249,7 @@ var (
errNoCPUQuotaMetrics = errors.Normalize("There is no CPU quota metrics, %v")
)

func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
startTs, endTs, err := e.parseCalibrateDuration(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -217,6 +277,32 @@ func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk
if err != nil {
return err
}
failpoint.Inject("mockMetricsDataFilter", func() {
ret := make([]*timePointValue, 0)
for _, point := range tikvCPUs.vals {
if point.tp.After(endTs) || point.tp.Before(startTs) {
continue
}
ret = append(ret, point)
}
tikvCPUs.vals = ret
ret = make([]*timePointValue, 0)
for _, point := range tidbCPUs.vals {
if point.tp.After(endTs) || point.tp.Before(startTs) {
continue
}
ret = append(ret, point)
}
tidbCPUs.vals = ret
ret = make([]*timePointValue, 0)
for _, point := range rus.vals {
if point.tp.After(endTs) || point.tp.Before(startTs) {
continue
}
ret = append(ret, point)
}
rus.vals = ret
})
quotas := make([]float64, 0)
lowCount := 0
for {
Expand Down Expand Up @@ -268,7 +354,7 @@ func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk
return nil
}

func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
if !variable.EnableResourceControl.Load() {
return infoschema.ErrResourceGroupSupportDisabled
}
Expand All @@ -288,12 +374,12 @@ func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.
}

// The default workload to calculate the RU capacity.
if e.workloadType == ast.WorkloadNone {
e.workloadType = ast.TPCC
if e.WorkloadType == ast.WorkloadNone {
e.WorkloadType = ast.TPCC
}
baseCost, ok := workloadBaseRUCostMap[e.workloadType]
baseCost, ok := workloadBaseRUCostMap[e.WorkloadType]
if !ok {
return errors.Errorf("unknown workload '%T'", e.workloadType)
return errors.Errorf("unknown workload '%T'", e.WorkloadType)
}

if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota {
Expand Down Expand Up @@ -359,12 +445,12 @@ func (t *timeSeriesValues) advance(target time.Time) bool {

func getRUPerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) {
query := fmt.Sprintf("SELECT time, value FROM METRICS_SCHEMA.resource_manager_resource_unit where time >= '%s' and time <= '%s' ORDER BY time asc", startTime, endTime)
return getValuesFromMetrics(ctx, sctx, exec, query, "resource_manager_resource_unit")
return getValuesFromMetrics(ctx, sctx, exec, query)
}

func getComponentCPUUsagePerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, component, startTime, endTime string) (*timeSeriesValues, error) {
query := fmt.Sprintf("SELECT time, sum(value) FROM METRICS_SCHEMA.process_cpu_usage where time >= '%s' and time <= '%s' and job like '%%%s' GROUP BY time ORDER BY time asc", startTime, endTime, component)
return getValuesFromMetrics(ctx, sctx, exec, query, "process_cpu_usage")
return getValuesFromMetrics(ctx, sctx, exec, query)
}

func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (float64, error) {
Expand All @@ -379,7 +465,7 @@ func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecuto
return rows[0].GetFloat64(0), nil
}

func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (*timeSeriesValues, error) {
func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, query string) (*timeSeriesValues, error) {
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query)
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading