Skip to content

Commit

Permalink
--wip-- [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed May 27, 2024
1 parent 4cd42b3 commit f004df1
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 25 deletions.
67 changes: 43 additions & 24 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
request := gc.collectRequestAndConsumption(typ)
if request != nil {
c.run.currentRequests = append(c.run.currentRequests, request)
gc.tokenRequestCounter.Inc()
gc.metrics.tokenRequestCounter.Inc()
}
return true
})
Expand Down Expand Up @@ -632,13 +632,9 @@ type groupCostController struct {
calculators []ResourceCalculator
handleRespFunc func(*rmpb.TokenBucketResponse)

successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounter prometheus.Counter
tokenRequestCounter prometheus.Counter

mu struct {
// metrics
metrics *metricsCollection
mu struct {
sync.Mutex
consumption *rmpb.Consumption
storeCounter map[uint64]*rmpb.Consumption
Expand Down Expand Up @@ -685,6 +681,30 @@ type groupCostController struct {
tombstone bool
}

type metricsCollection struct {
successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounterWithOthers prometheus.Counter
failedRequestCounterWithThrottled prometheus.Counter
tokenRequestCounter prometheus.Counter
}

func initMetrics(oldName, name string) *metricsCollection {
const (
otherType = "others"
throttledType = "throttled"
)
return &metricsCollection{
successfulRequestDuration: successfulRequestDuration.WithLabelValues(oldName, name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(oldName, name),
failedRequestCounterWithOthers: failedRequestCounter.WithLabelValues(oldName, name, otherType),
failedRequestCounterWithThrottled: failedRequestCounter.WithLabelValues(oldName, name, throttledType),
requestRetryCounter: requestRetryCounter.WithLabelValues(oldName, name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
}
}

type tokenCounter struct {
getTokenBucketFunc func() *rmpb.TokenBucket

Expand Down Expand Up @@ -725,16 +745,13 @@ func newGroupCostController(
default:
return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type")
}
ms := initMetrics(group.Name, group.Name)
gc := &groupCostController{
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name, group.Name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(group.Name, group.Name),
failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name, group.Name),
requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name, group.Name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name, group.Name),
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
metrics: ms,
calculators: []ResourceCalculator{
newKVCalculator(mainCfg),
newSQLCalculator(mainCfg),
Expand Down Expand Up @@ -1233,7 +1250,7 @@ func (gc *groupCostController) onRequestWait(
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
case rmpb.GroupMode_RUMode:
Expand All @@ -1243,18 +1260,20 @@ func (gc *groupCostController) onRequestWait(
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
}
gc.requestRetryCounter.Inc()
gc.metrics.requestRetryCounter.Inc()
time.Sleep(gc.mainCfg.WaitRetryInterval)
waitDuration += gc.mainCfg.WaitRetryInterval
}
if err != nil {
gc.failedRequestCounter.Inc()
if d.Seconds() > 0 {
gc.failedLimitReserveDuration.Observe(d.Seconds())
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
}
gc.mu.Lock()
sub(gc.mu.consumption, delta)
Expand All @@ -1264,7 +1283,7 @@ func (gc *groupCostController) onRequestWait(
})
return nil, nil, waitDuration, 0, err
}
gc.successfulRequestDuration.Observe(d.Seconds())
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

Expand Down
15 changes: 15 additions & 0 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/errs"
)

func createTestGroupCostController(re *require.Assertions) *groupCostController {
Expand Down Expand Up @@ -117,3 +118,17 @@ func TestRequestAndResponseConsumption(t *testing.T) {
re.Equal(expectedConsumption.TotalCpuTimeMs, consumption.TotalCpuTimeMs, caseNum)
}
}

func TestResourceGroupThrottledError(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
req := &TestRequestInfo{
isWrite: true,
writeBytes: 10000000,
}
// The group is throttled
_, _, _, _, err := gc.onRequestWait(context.TODO(), req)
re.Error(err)
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
}
4 changes: 3 additions & 1 deletion client/resource_group/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
// TODO: remove old label in 8.x
resourceGroupNameLabel = "name"
newResourceGroupNameLabel = "resource_group"

errType = "type"
)

var (
Expand Down Expand Up @@ -59,7 +61,7 @@ var (
Subsystem: requestSubsystem,
Name: "fail",
Help: "Counter of failed request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel, errType})

requestRetryCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down

0 comments on commit f004df1

Please sign in to comment.