Skip to content

Commit

Permalink
resource-manager: improve trace logs, ctl and metrics
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Dec 8, 2023
1 parent ad232d1 commit 3f9bff5
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 30 deletions.
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
Expand All @@ -31,7 +32,6 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type Config struct {
// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`

// EnableControllerTraceLog is to control whether resource control client enable trace.
EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"`
}

// DefaultConfig returns the default resource manager controller configuration.
Expand All @@ -96,6 +99,7 @@ func DefaultConfig() *Config {
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
RequestUnit: DefaultRequestUnitConfig(),
EnableControllerTraceLog: false,
}
}

Expand Down
53 changes: 36 additions & 17 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
Expand All @@ -54,6 +55,14 @@ const (
lowToken selectType = 1
)

var enableControllerTraceLog = atomicutil.NewBool(false)

func logControllerTrace(msg string, fields ...zap.Field) {
if enableControllerTraceLog.Load() {
log.Info(msg, fields...)
}
}

// ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
Expand Down Expand Up @@ -369,6 +378,9 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
copyCfg := *c.ruConfig
c.safeRuConfig.Store(&copyCfg)
if enableControllerTraceLog.Load() != config.EnableControllerTraceLog {
enableControllerTraceLog.Store(config.EnableControllerTraceLog)
}
log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig))
}

Expand Down Expand Up @@ -505,7 +517,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
c.responseDeadlineCh = c.run.responseDeadline.C
}
go func() {
log.Debug("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source))
logControllerTrace("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source))
resp, err := c.provider.AcquireTokenBuckets(ctx, req)
latency := time.Since(now)
if err != nil {
Expand All @@ -518,7 +530,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
} else {
successfulTokenRequestDuration.Observe(latency.Seconds())
}
log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency))
logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency))
c.tokenResponseChan <- resp
}()
}
Expand Down Expand Up @@ -603,10 +615,11 @@ type groupCostController struct {
calculators []ResourceCalculator
handleRespFunc func(*rmpb.TokenBucketResponse)

successfulRequestDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounter prometheus.Counter
tokenRequestCounter prometheus.Counter
successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounter prometheus.Counter
tokenRequestCounter prometheus.Counter

mu struct {
sync.Mutex
Expand Down Expand Up @@ -696,14 +709,15 @@ func newGroupCostController(
return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type")
}
gc := &groupCostController{
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name),
failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name),
requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name),
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(group.Name),
failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name),
requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name),
calculators: []ResourceCalculator{
newKVCalculator(mainCfg),
newSQLCalculator(mainCfg),
Expand Down Expand Up @@ -805,7 +819,7 @@ func (gc *groupCostController) updateRunState() {
}
*gc.run.consumption = *gc.mu.consumption
gc.mu.Unlock()
log.Debug("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption))
logControllerTrace("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption))
gc.run.now = newTime
}

Expand Down Expand Up @@ -886,7 +900,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() {
if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
logControllerTrace("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
}
gc.burstable.Store(isBurstable)
}
Expand All @@ -900,7 +914,7 @@ func (gc *groupCostController) updateAvgRUPerSec() {
if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
}
gc.burstable.Store(isBurstable)
}
Expand Down Expand Up @@ -1204,6 +1218,9 @@ func (gc *groupCostController) onRequestWait(
}
if d, err = WaitReservations(ctx, now, res); err == nil {
break retryLoop
} else if d.Seconds() > 0 {
log.Error("failed to reserve resource", zap.Duration("need-wait-duration", d), zap.Error(errs.ErrClientResourceGroupThrottled))
gc.failedLimitReserveDuration.Observe(d.Seconds())
}
case rmpb.GroupMode_RUMode:
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
Expand All @@ -1214,6 +1231,8 @@ func (gc *groupCostController) onRequestWait(
}
if d, err = WaitReservations(ctx, now, res); err == nil {
break retryLoop
} else if d.Seconds() > 0 {
gc.failedLimitReserveDuration.Observe(d.Seconds())
}
}
gc.requestRetryCounter.Inc()
Expand Down
31 changes: 20 additions & 11 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensN
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
needWaitDurtion time.Duration
// This is the Limit at reservation time, it can change later.
limit Limit
}
Expand Down Expand Up @@ -301,7 +302,7 @@ func (lim *Limiter) Reconfigure(now time.Time,
) {
lim.mu.Lock()
defer lim.mu.Unlock()
log.Debug("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst))
logControllerTrace("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst))
if args.NewBurst < 0 {
lim.last = now
lim.tokens = args.NewTokens
Expand All @@ -317,7 +318,7 @@ func (lim *Limiter) Reconfigure(now time.Time,
opt(lim)
}
lim.maybeNotify()
log.Debug("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst))
logControllerTrace("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst))
}

// AvailableTokens decreases the amount of tokens currently available.
Expand Down Expand Up @@ -358,9 +359,10 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur

// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
ok: ok,
lim: lim,
limit: lim.limit,
needWaitDurtion: waitDuration,
}
if ok {
r.tokens = n
Expand All @@ -372,7 +374,14 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
lim.tokens = tokens
lim.maybeNotify()
} else {
log.Debug("[resource group controller]", zap.Float64("current-tokens", lim.tokens), zap.Float64("current-rate", float64(lim.limit)), zap.Float64("request-tokens", n), zap.Int64("burst", lim.burst), zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
log.Warn("[resource group controller] cannot reserve enough tokens",
zap.Duration("need-wait-duration", waitDuration),
zap.Duration("max-wait-duration", maxFutureReserve),
zap.Float64("current-ltb-tokens", lim.tokens),
zap.Float64("current-ltb-rate", float64(lim.limit)),
zap.Float64("request-tokens", n),
zap.Int64("burst", lim.burst),
zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
lim.last = last
if lim.limit == 0 {
lim.notify()
Expand Down Expand Up @@ -452,7 +461,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv
for _, res := range reservations {
if !res.ok {
cancel()
return 0, errs.ErrClientResourceGroupThrottled
return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled
}
delay := res.DelayFrom(now)
if delay > longestDelayDuration {
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestCancel(t *testing.T) {
checkTokens(re, lim1, t2, 7)
checkTokens(re, lim2, t2, 2)
d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2})
re.Equal(d, time.Duration(0))
re.Equal(d, 4*time.Second)
re.Error(err)
checkTokens(re, lim1, t3, 13)
checkTokens(re, lim2, t3, 3)
Expand Down
9 changes: 9 additions & 0 deletions client/resource_group/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ var (
Help: "Bucketed histogram of wait duration of successful request.",
}, []string{resourceGroupNameLabel})

failedLimitReserveDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "limit_reserve_time_failed",
Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30
Help: "Bucketed histogram of wait duration of failed request.",
}, []string{resourceGroupNameLabel})

failedRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/resourcemanager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ type ControllerConfig struct {
// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`

// EnableControllerTraceLog is to control whether resource control client enable trace.
EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"`
}

// Adjust adjusts the configuration and initializes it with the default value if necessary.
Expand Down
97 changes: 97 additions & 0 deletions tests/pdctl/resourcemanager/resource_manager_command_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package resourcemanager_test

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/resourcemanager/server"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/pdctl"
pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl"
)

func TestResourceManagerSuite(t *testing.T) {
suite.Run(t, new(testResourceManagerSuite))
}

type testResourceManagerSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
cluster *tests.TestCluster
pdAddr string
}

func (s *testResourceManagerSuite) SetupSuite() {
s.ctx, s.cancel = context.WithCancel(context.Background())
cluster, err := tests.NewTestCluster(s.ctx, 1)
s.Nil(err)
s.cluster = cluster
s.cluster.RunInitialServers()
cluster.WaitLeader()
s.pdAddr = cluster.GetConfig().GetClientURL()
}

func (s *testResourceManagerSuite) TearDownSuite() {
s.cancel()
s.cluster.Destroy()
}

func (s *testResourceManagerSuite) TestConfigController() {
expectCfg := server.ControllerConfig{}
expectCfg.Adjust(nil)
// Show controller config
checkShow := func() {
args := []string{"-u", s.pdAddr, "resource-manager", "config", "controller", "show"}
output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...)
s.Nil(err)

actualCfg := server.ControllerConfig{}
err = json.Unmarshal(output, &actualCfg)
s.Nil(err)
s.Equal(expectCfg, actualCfg)
}

// Check default config
checkShow()

// Set controller config
args := []string{"-u", s.pdAddr, "resource-manager", "config", "controller", "set", "ltb-max-wait-duration", "1h"}
output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...)
s.Nil(err)
s.Contains(string(output), "Success!")
expectCfg.LTBMaxWaitDuration = typeutil.Duration{Duration: 1 * time.Hour}
checkShow()

args = []string{"-u", s.pdAddr, "resource-manager", "config", "controller", "set", "enable-controller-trace-log", "true"}
output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...)
s.Nil(err)
s.Contains(string(output), "Success!")
expectCfg.EnableControllerTraceLog = true
checkShow()

args = []string{"-u", s.pdAddr, "resource-manager", "config", "controller", "set", "write-base-cost", "2"}
output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...)
s.Nil(err)
s.Contains(string(output), "Success!")
expectCfg.RequestUnit.WriteBaseCost = 2
checkShow()
}
Loading

0 comments on commit 3f9bff5

Please sign in to comment.