diff --git a/client/go.mod b/client/go.mod index 948a5c22b14..54be0c96765 100644 --- a/client/go.mod +++ b/client/go.mod @@ -13,6 +13,7 @@ require ( github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.11.1 github.com/stretchr/testify v1.8.2 + go.uber.org/atomic v1.10.0 go.uber.org/goleak v1.1.11 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 @@ -31,7 +32,6 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect - go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index 16a2525cd0d..ffc360c385c 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -88,6 +88,9 @@ type Config struct { // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` + + // EnableControllerTraceLog is to control whether resource control client enable trace. + EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"` } // DefaultConfig returns the default resource manager controller configuration. @@ -96,6 +99,7 @@ func DefaultConfig() *Config { DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), RequestUnit: DefaultRequestUnitConfig(), + EnableControllerTraceLog: false, } } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 56d9ef9bd1b..b2a47ba0636 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" + atomicutil "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/exp/slices" ) @@ -54,6 +55,14 @@ const ( lowToken selectType = 1 ) +var enableControllerTraceLog = atomicutil.NewBool(false) + +func logControllerTrace(msg string, fields ...zap.Field) { + if enableControllerTraceLog.Load() { + log.Info(msg, fields...) + } +} + // ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store. type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. @@ -369,6 +378,9 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } copyCfg := *c.ruConfig c.safeRuConfig.Store(©Cfg) + if enableControllerTraceLog.Load() != config.EnableControllerTraceLog { + enableControllerTraceLog.Store(config.EnableControllerTraceLog) + } log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig)) } @@ -505,7 +517,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, c.responseDeadlineCh = c.run.responseDeadline.C } go func() { - log.Debug("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source)) + logControllerTrace("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source)) resp, err := c.provider.AcquireTokenBuckets(ctx, req) latency := time.Since(now) if err != nil { @@ -518,7 +530,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, } else { successfulTokenRequestDuration.Observe(latency.Seconds()) } - log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency)) + logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency)) c.tokenResponseChan <- resp }() } @@ -603,10 +615,11 @@ type groupCostController struct { calculators []ResourceCalculator handleRespFunc func(*rmpb.TokenBucketResponse) - successfulRequestDuration prometheus.Observer - requestRetryCounter prometheus.Counter - failedRequestCounter prometheus.Counter - tokenRequestCounter prometheus.Counter + successfulRequestDuration prometheus.Observer + failedLimitReserveDuration prometheus.Observer + requestRetryCounter prometheus.Counter + failedRequestCounter prometheus.Counter + tokenRequestCounter prometheus.Counter mu struct { sync.Mutex @@ -696,14 +709,15 @@ func newGroupCostController( return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type") } gc := &groupCostController{ - meta: group, - name: group.Name, - mainCfg: mainCfg, - mode: group.GetMode(), - successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name), - failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name), - requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name), - tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name), + meta: group, + name: group.Name, + mainCfg: mainCfg, + mode: group.GetMode(), + successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name), + failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(group.Name), + failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name), + requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name), + tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name), calculators: []ResourceCalculator{ newKVCalculator(mainCfg), newSQLCalculator(mainCfg), @@ -805,7 +819,7 @@ func (gc *groupCostController) updateRunState() { } *gc.run.consumption = *gc.mu.consumption gc.mu.Unlock() - log.Debug("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption)) + logControllerTrace("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption)) gc.run.now = newTime } @@ -886,7 +900,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() { if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) { continue } - log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec)) + logControllerTrace("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec)) } gc.burstable.Store(isBurstable) } @@ -900,7 +914,7 @@ func (gc *groupCostController) updateAvgRUPerSec() { if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) { continue } - log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec)) + logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec)) } gc.burstable.Store(isBurstable) } @@ -1204,6 +1218,8 @@ func (gc *groupCostController) onRequestWait( } if d, err = WaitReservations(ctx, now, res); err == nil { break retryLoop + } else if d.Seconds() > 0 { + gc.failedLimitReserveDuration.Observe(d.Seconds()) } case rmpb.GroupMode_RUMode: res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) @@ -1214,6 +1230,8 @@ func (gc *groupCostController) onRequestWait( } if d, err = WaitReservations(ctx, now, res); err == nil { break retryLoop + } else if d.Seconds() > 0 { + gc.failedLimitReserveDuration.Observe(d.Seconds()) } } gc.requestRetryCounter.Inc() diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index f89ab17514c..63c94a9782b 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -122,10 +122,11 @@ func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensN // A Reservation holds information about events that are permitted by a Limiter to happen after a delay. // A Reservation may be canceled, which may enable the Limiter to permit additional events. type Reservation struct { - ok bool - lim *Limiter - tokens float64 - timeToAct time.Time + ok bool + lim *Limiter + tokens float64 + timeToAct time.Time + needWaitDurtion time.Duration // This is the Limit at reservation time, it can change later. limit Limit } @@ -301,7 +302,7 @@ func (lim *Limiter) Reconfigure(now time.Time, ) { lim.mu.Lock() defer lim.mu.Unlock() - log.Debug("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst)) + logControllerTrace("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst)) if args.NewBurst < 0 { lim.last = now lim.tokens = args.NewTokens @@ -317,7 +318,7 @@ func (lim *Limiter) Reconfigure(now time.Time, opt(lim) } lim.maybeNotify() - log.Debug("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst)) + logControllerTrace("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst)) } // AvailableTokens decreases the amount of tokens currently available. @@ -358,9 +359,10 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur // Prepare reservation r := Reservation{ - ok: ok, - lim: lim, - limit: lim.limit, + ok: ok, + lim: lim, + limit: lim.limit, + needWaitDurtion: waitDuration, } if ok { r.tokens = n @@ -372,7 +374,14 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur lim.tokens = tokens lim.maybeNotify() } else { - log.Debug("[resource group controller]", zap.Float64("current-tokens", lim.tokens), zap.Float64("current-rate", float64(lim.limit)), zap.Float64("request-tokens", n), zap.Int64("burst", lim.burst), zap.Int("remaining-notify-times", lim.remainingNotifyTimes)) + log.Warn("[resource group controller] cannot reserve enough tokens", + zap.Duration("need-wait-duration", waitDuration), + zap.Duration("max-wait-duration", maxFutureReserve), + zap.Float64("current-ltb-tokens", lim.tokens), + zap.Float64("current-ltb-rate", float64(lim.limit)), + zap.Float64("request-tokens", n), + zap.Int64("burst", lim.burst), + zap.Int("remaining-notify-times", lim.remainingNotifyTimes)) lim.last = last if lim.limit == 0 { lim.notify() @@ -452,7 +461,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv for _, res := range reservations { if !res.ok { cancel() - return 0, errs.ErrClientResourceGroupThrottled + return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled } delay := res.DelayFrom(now) if delay > longestDelayDuration { diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index b8b96ae13d6..d963f830551 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -161,7 +161,7 @@ func TestCancel(t *testing.T) { checkTokens(re, lim1, t2, 7) checkTokens(re, lim2, t2, 2) d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2}) - re.Equal(d, time.Duration(0)) + re.Equal(d, 4*time.Second) re.Error(err) checkTokens(re, lim1, t3, 13) checkTokens(re, lim2, t3, 3) diff --git a/client/resource_group/controller/metrics.go b/client/resource_group/controller/metrics.go index 68eb26d0312..47e285ad775 100644 --- a/client/resource_group/controller/metrics.go +++ b/client/resource_group/controller/metrics.go @@ -42,6 +42,15 @@ var ( Help: "Bucketed histogram of wait duration of successful request.", }, []string{resourceGroupNameLabel}) + failedLimitReserveDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "limit_reserve_time_failed", + Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30 + Help: "Bucketed histogram of wait duration of failed request.", + }, []string{resourceGroupNameLabel}) + failedRequestCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, diff --git a/pkg/mcs/resourcemanager/server/config.go b/pkg/mcs/resourcemanager/server/config.go index 10e91612842..bcd5a853dfc 100644 --- a/pkg/mcs/resourcemanager/server/config.go +++ b/pkg/mcs/resourcemanager/server/config.go @@ -102,6 +102,9 @@ type ControllerConfig struct { // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` + + // EnableControllerTraceLog is to control whether resource control client enable trace. + EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"` } // Adjust adjusts the configuration and initializes it with the default value if necessary. diff --git a/tests/pdctl/resourcemanager/resource_manager_command_test.go b/tests/pdctl/resourcemanager/resource_manager_command_test.go new file mode 100644 index 00000000000..ad43e0abca9 --- /dev/null +++ b/tests/pdctl/resourcemanager/resource_manager_command_test.go @@ -0,0 +1,97 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resourcemanager_test + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/mcs/resourcemanager/server" + "github.com/tikv/pd/pkg/utils/typeutil" + "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" +) + +func TestResourceManagerSuite(t *testing.T) { + suite.Run(t, new(testResourceManagerSuite)) +} + +type testResourceManagerSuite struct { + suite.Suite + ctx context.Context + cancel context.CancelFunc + cluster *tests.TestCluster + pdAddr string +} + +func (s *testResourceManagerSuite) SetupSuite() { + s.ctx, s.cancel = context.WithCancel(context.Background()) + cluster, err := tests.NewTestCluster(s.ctx, 1) + s.Nil(err) + s.cluster = cluster + s.cluster.RunInitialServers() + cluster.WaitLeader() + s.pdAddr = cluster.GetConfig().GetClientURL() +} + +func (s *testResourceManagerSuite) TearDownSuite() { + s.cancel() + s.cluster.Destroy() +} + +func (s *testResourceManagerSuite) TestConfigController() { + expectCfg := server.ControllerConfig{} + expectCfg.Adjust(nil) + // Show controller config + checkShow := func() { + args := []string{"-u", s.pdAddr, "resource-manager", "config", "controller", "show"} + output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + s.Nil(err) + + actualCfg := server.ControllerConfig{} + err = json.Unmarshal(output, &actualCfg) + s.Nil(err) + s.Equal(expectCfg, actualCfg) + } + + // Check default config + checkShow() + + // Set controller config + args := []string{"-u", s.pdAddr, "resource-manager", "config", "controller", "set", "ltb-max-wait-duration", "1h"} + output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + s.Nil(err) + s.Contains(string(output), "Success!") + expectCfg.LTBMaxWaitDuration = typeutil.Duration{Duration: 1 * time.Hour} + checkShow() + + args = []string{"-u", s.pdAddr, "resource-manager", "config", "controller", "set", "enable-controller-trace-log", "true"} + output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + s.Nil(err) + s.Contains(string(output), "Success!") + expectCfg.EnableControllerTraceLog = true + checkShow() + + args = []string{"-u", s.pdAddr, "resource-manager", "config", "controller", "set", "write-base-cost", "2"} + output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + s.Nil(err) + s.Contains(string(output), "Success!") + expectCfg.RequestUnit.WriteBaseCost = 2 + checkShow() +} diff --git a/tools/pd-ctl/pdctl/command/resource_manager_command.go b/tools/pd-ctl/pdctl/command/resource_manager_command.go new file mode 100644 index 00000000000..8bc5ea85977 --- /dev/null +++ b/tools/pd-ctl/pdctl/command/resource_manager_command.go @@ -0,0 +1,112 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "strconv" + + "github.com/spf13/cobra" +) + +const ( + resourceManagerPrefix = "resource-manager/api/v1" + // flags + rmConfigController = "config/controller" +) + +// NewResourceManagerCommand return a resource manager subcommand of rootCmd +func NewResourceManagerCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "resource-manager [flags]", + Short: "resource-manager commands", + } + cmd.AddCommand(newResourceManagerConfigCommand()) + return cmd +} + +func newResourceManagerConfigCommand() *cobra.Command { + r := &cobra.Command{ + Use: "config", + Short: "config resource manager", + } + r.AddCommand(newConfigControllerCommand()) + return r +} + +func newConfigControllerCommand() *cobra.Command { + r := &cobra.Command{ + Use: "controller", + Short: "config controller", + } + r.AddCommand(newConfigControllerSetCommand()) + r.AddCommand(newConfigControllerShowCommand()) + return r +} + +func newConfigControllerSetCommand() *cobra.Command { + r := &cobra.Command{ + Use: "set ", + Short: "set controller config", + Run: func(cmd *cobra.Command, args []string) { + if len(args) != 2 { + cmd.Println(cmd.UsageString()) + return + } + + var val interface{} + val, err := strconv.ParseFloat(args[1], 64) + if err != nil { + val = args[1] + } + data := map[string]interface{}{args[0]: val} + jsonData, err := json.Marshal(data) + if err != nil { + cmd.Println(err) + return + } + resp, err := doRequest(cmd, fmt.Sprintf("%s/%s", resourceManagerPrefix, rmConfigController), http.MethodPost, http.Header{}, WithBody(bytes.NewBuffer(jsonData))) + if err != nil { + cmd.PrintErrln("Failed to set the config: ", err) + return + } + cmd.Println(resp) + }, + } + return r +} + +func newConfigControllerShowCommand() *cobra.Command { + r := &cobra.Command{ + Use: "show", + Short: "show controller config", + Run: func(cmd *cobra.Command, args []string) { + if len(args) != 0 { + cmd.Println(cmd.UsageString()) + return + } + resp, err := doRequest(cmd, fmt.Sprintf("%s/%s", resourceManagerPrefix, rmConfigController), http.MethodGet, http.Header{}) + if err != nil { + cmd.Println(err) + return + } + cmd.Println(resp) + }, + } + return r +} diff --git a/tools/pd-ctl/pdctl/ctl.go b/tools/pd-ctl/pdctl/ctl.go index 86494c046eb..7a3c540b266 100644 --- a/tools/pd-ctl/pdctl/ctl.go +++ b/tools/pd-ctl/pdctl/ctl.go @@ -67,6 +67,7 @@ func GetRootCmd() *cobra.Command { command.NewUnsafeCommand(), command.NewKeyspaceGroupCommand(), command.NewKeyspaceCommand(), + command.NewResourceManagerCommand(), ) rootCmd.Flags().ParseErrorsWhitelist.UnknownFlags = true